smv package¶
Subpackages¶
Submodules¶
smv.csv_attributes module¶
SMV User Run Configuration Parameters
This module defined the SmvRunConfig class which can be mixed-in into an SmvModule to get user configuration parameters at run-time.
smv.datasetmgr module¶
DataSetMgr entry class This module provides the python entry point to DataSetMgr on scala side
-
class
smv.datasetmgr.
DataSetMgr
(_jvm, smvconfig)[source]¶ Bases:
object
The Python representation of DataSetMgr.
-
inferDS
(*partial_names)[source]¶ Return DSs from a list of partial names
Parameters: *partial_names (str) – list of partial names Returns: list of SmvGenericModules Return type: list(SmvGenericModules)
-
load
(*fqns)[source]¶ Load SmvGenericModules for specified FQNs
Parameters: *fqns (str) – list of FQNs as strings Returns: list of Scala SmvGenericModules (j_ds) Return type: list(SmvGenericModules)
-
-
class
smv.datasetmgr.
TX
(_jvm, resourceFactories, stages)[source]¶ Bases:
object
Abstraction of the transaction boundary for loading SmvGenericModules. A TX object
- will instantiate a set of repos when itself instantiated and will
- reuse the same repos for all queries. This means that each new TX object will
- reload the SmvGenericModules from source once during its lifetime.
NOTE: Once a new TX is created, the well-formedness of the SmvGenericModules provided by the previous TX is not guaranteed. Particularly it may become impossible to run modules from the previous TX.
smv.datasetrepo module¶
smv.datasetresolver module¶
-
class
smv.datasetresolver.
DataSetResolver
(repo)[source]¶ Bases:
object
DataSetResolver (DSR) is the entrypoint through which the DataSetMgr acquires SmvGenericModules. A DSR object represent a single transaction. Each DSR creates a set of DataSetRepos at instantiation. When asked for an SmvGenericModules, DSR queries the repos for that SmvGenericModule and resolves it. The SmvGenericModule is responsible for resolving itself, given access to the DSR to load/resolve the SmvGenericModule’s dependencies. DSR caches the SmvGenericModule it has already resolved to ensure that any SmvGenericModule is only resolved once.
smv.dqm module¶
SMV DataSet Framework interface
This module defines the abstract classes which formed the SmvGenericModule Framework for clients’ projects
-
smv.dqm.
DQMFix
(condition, fix, name=None, taskPolicy=None)[source]¶ DQMFix will fix a column with a default value
Example
If “age” greater than 100, make it 100
>>> DQMFix(col('age') > 100, lit(100).alias('age'), 'age_cap100', FailNone)
Parameters: - condition (Column) – boolean condition that determines when the fix should occur on the records of a DF
- fix (Column) – the fix to use when replacing a value that does not pass the condition
- name (String) – optional parameter for naming the DQMFix. if not specified, defaults to the condition text
- taskPolicy (DQMTaskPolicy) – optional parameter for the DQM policy. if not specified, defaults to FailNone()
Returns: a DQMFix object
Return type: (DQMFix)
-
smv.dqm.
DQMRule
(rule, name=None, taskPolicy=None)[source]¶ DQMRule defines a requirement on the records of a DF
Example
Require the sum of “a” and “b” columns less than 100
>>> DQMRule(col('a') + col('b') < 100.0, 'a_b_sum_lt100', FailPercent(0.01))
Parameters: - rule (Column) – boolean condition that defines the requirement on the records of a DF
- name (string) – optional parameter for naming the DQMRule. if not specified, defaults to the rule text
- taskPolicy (DQMTaskPolicy) – optional parameter for the DQM policy. if not specified, defaults to FailNone()
Returns: a DQMRule object
Return type: (DQMRule)
-
smv.dqm.
FailAny
()[source]¶ Any rule fail or fix with FailAny will cause the entire DF to fail
Returns: policy for DQM Task Return type: (DQMTaskPolicy)
-
smv.dqm.
FailCount
(threshold)[source]¶ Tasks with FailCount(n) will fail the DF if the task is triggered >= n times
Parameters: threshold (int) – the threshold after which the DF fails Returns: policy for DQM Task Return type: (DQMTaskPolicy)
-
smv.dqm.
FailNone
()[source]¶ Tasks with FailNone will not trigger any DF level policy
Returns: policy for DQM Task Return type: (DQMTaskPolicy)
-
smv.dqm.
FailParserCountPolicy
(threshold)[source]¶ If the total time of parser fails >= threshold, fail the DF
Parameters: threshold (int) – the threshold after which the DF fails Returns: policy for DQM Return type: (DQMPolicy)
-
smv.dqm.
FailPercent
(threshold)[source]¶ Tasks with FailPercent(r) will fail the DF if the task is triggered >= r percent of the total number of records in the DF
Parameters: threshold (double) – the threshold after which the DF fails. value is between 0.0 and 1.0 Returns: policy for DQM Task Return type: (DQMTaskPolicy)
-
smv.dqm.
FailTotalFixCountPolicy
(threshold)[source]¶ For all the fixes in a DQM, if the total number of times they are triggered is >= threshold, fail the DF
Parameters: threshold (int) – the threshold after which the DF fails Returns: policy for DQM Return type: (DQMPolicy)
-
smv.dqm.
FailTotalFixPercentPolicy
(threshold)[source]¶ For all the fixes in a DQM, if the total number of times they are triggered is >= threshold * total Records, fail the DF
Parameters: threshold (double) – the threshold after which the DF fails. value is between 0.0 and 1.0 Returns: policy for DQM Return type: (DQMPolicy)
-
smv.dqm.
FailTotalRuleCountPolicy
(threshold)[source]¶ For all the rules in a DQM, if the total number of times they are triggered is >= threshold, fail the DF
Parameters: threshold (int) – the threshold after which the DF fails Returns: policy for DQM Return type: (DQMPolicy)
-
smv.dqm.
FailTotalRulePercentPolicy
(threshold)[source]¶ For all the rules in a DQM, if the total number of times they are triggered is >= threshold * total Records, fail the DF
Parameters: threshold (double) – the threshold after which the DF fails. value is between 0.0 and 1.0 Returns: policy for DQM Return type: (DQMPolicy)
smv.error module¶
Errors thrown by SMV
-
exception
smv.error.
SmvDqmValidationError
(dqmValidationResult)[source]¶ Bases:
smv.error.SmvRuntimeError
This class has an instance of dqmValidationResult(dict)
-
exception
smv.error.
SmvMetadataValidationError
(msg)[source]¶ Bases:
smv.error.SmvRuntimeError
smv.functions module¶
-
smv.functions.
diceSorensen
(c1, c2)[source]¶ 2-gram UDF with formula (2 * number of overlaped gramCnt)/(s1.gramCnt + s2.gramCnt)
Parameters: - c1 (Column) – first column
- c2 (Column) – second column
Returns: 2-gram
Return type: (Column)
-
smv.functions.
jaroWinkler
(c1, c2)[source]¶ Jaro-Winkler edit distance metric UDF
Parameters: - c1 (Column) – first column
- c2 (Column) – second column
Returns: distances
Return type: (Column)
-
smv.functions.
nGram2
(c1, c2)[source]¶ 2-gram UDF with formula (number of overlaped gramCnt)/max(c1.gramCnt, c2.gramCnt)
Parameters: - c1 (Column) – first column
- c2 (Column) – second column
Returns: 2-gram
Return type: (Column)
-
smv.functions.
nGram3
(c1, c2)[source]¶ 3-gram UDF with formula (number of overlaped gramCnt)/max(s1.gramCnt, s2.gramCnt)
Parameters: - c1 (Column) – first column
- c2 (Column) – second column
Returns: 3-gram
Return type: (Column)
-
smv.functions.
normlevenshtein
(c1, c2)[source]¶ Levenshtein edit distance metric UDF
Parameters: - c1 (Column) – first column
- c2 (Column) – second column
Returns: distances
Return type: (Column)
-
smv.functions.
smvArrayCat
(sep, col)[source]¶ For an array typed column, concat the elements to a string with the given separater.
Parameters: - sep – a Python string to separate the fields
- col – a Column with ArrayType
Returns: a Column in StringType with array elements concatenated
Return type: (col)
-
smv.functions.
smvCollectSet
(col, datatype)[source]¶ An aggregate function, which will collect all the values of the given column and create a set as an array typed column. Since Spark 1.6, a spark function collect_set was introduced, so as migrate to Spark 1.6 and later, this smvCollectSet will be depricated.
Parameters: - col (Column) – column to be aggregated on
- datatype (DataType) – datatype of the input column
-
smv.functions.
smvCreateLookUp
(m, default, outputType)[source]¶ Return a Python UDF which will perform a dictionary lookup on a column
Parameters: - m (dictionary) – a Python dictionary to be applied
- default (any) – default value if dictionary lookup failed
- outputType (DataType) – output value’s data type
Returns: an udf which can apply to a column and apply the lookup
Return type: (udf)
-
smv.functions.
smvFirst
(c, nonNull=False)[source]¶ Variation of Spark “first” which also returns null values
Since Spark “first” will return the first non-null value, we have to create our version smvFirst which to retune the real first value, even if it’s null. Alternatively can return the first non-null value.
Parameters: - (Column (c) – column to extract first value from
- nonNull (bool) – If false, return first value even if null. If true, return first non-null value. Defaults to false.
Returns: first value
Return type: (object)
-
smv.functions.
smvHashKey
(head, *others)[source]¶ Create MD5 on concatenated columns. Return “Prefix” + MD5 Hex string(size 32 string) as the unique key
MD5’s collisions rate on real data records could be ignored based on the following discussion.
https://marc-stevens.nl/research/md5-1block-collision/ The shortest messages have the same MD5 are 512-bit (64-byte) messages as below
4dc968ff0ee35c209572d4777b721587d36fa7b21bdc56b74a3dc0783e7b9518afbfa200a8284bf36e8e4b55b35f427593d849676da0d1555d8360fb5f07fea2 and the (different by two bits) 4dc968ff0ee35c209572d4777b721587d36fa7b21bdc56b74a3dc0783e7b9518afbfa202a8284bf36e8e4b55b35f427593d849676da0d1d55d8360fb5f07fea2 both have MD5 hash 008ee33a9d58b51cfeb425b0959121c9
There are other those pairs, but all carefully constructed. Theoretically the random collisions will happen on data size approaching 2^64 (since MD5 has 128-bit), which is much larger than the number of records we deal with (a billion is about 2^30) There for using MD5 to hash primary key columns is good enough for creating an unique key
This function can take 2 forms: - smvHashKey(prefix, col1, col2, …) - smvHashKey(col1, col2, …)
Parameters: - prefix (String) – return string’s prefix
- col. (Column) – columns to be part of hash
Returns: a StringType column as Prefix + MD5 Hex string
Return type: (col)
-
smv.functions.
smvStrCat
(head, *others)[source]¶ Concatenate multiple columns to a single string. Similar to concat and concat_ws functions in Spark but behaves differently when some columns are nulls. The Spark version will return null if any of the inputs is null. smvStrCat will return null if all of the inputs are nulls, otherwise it will coalesce null cols to blank.
This function can take 2 forms: - smvStrCat(sep, col1, col2, …) - smvStrCat(col1, col2, …)
Parameters: - sep (String) – separater for the concats
- col. (Column) – columns to be concatenated
Returns: a StringType column
Return type: (col)
smv.helpers module¶
SMV DataFrame Helpers and Column Helpers
This module provides the helper functions on DataFrame objects and Column objects
-
class
smv.helpers.
ColumnHelper
(col)[source]¶ Bases:
object
-
smvArrayFlatten
(elemType)[source]¶ smvArrayFlatten helper applies flatten operation on an Array of Array column.
Example
>>> df.select(col('arrayOfArrayOfStr').smvArrayFlatten(StringType()))
Parameters: elemType (DataType or DataFram) – array element’s data type, in object form or the DataFrame to infer the element data type
-
smvDay70
()[source]¶ Convert a Timestamp to the number of days from 1970-01-01
Example
>>> df.select(col("dob").smvDay70())
Returns: IntegerType. Number of days from 1970-01-01 (start from 0) Return type: (Column)
-
smvDayOfMonth
()[source]¶ Extract day of month component from a timestamp
Example
>>> df.select(col("dob").smvDayOfMonth())
Returns: IntegerType. Day of month component as integer (range 1-31), or null if input column is null Return type: (Column)
-
smvDayOfWeek
()[source]¶ Extract day of week component from a timestamp
Example
>>> df.select(col("dob").smvDayOfWeek())
Returns: IntegerType. Day of week component as integer (range 1-7, 1 being Monday), or null if input column is null Return type: (Column)
-
smvGetColName
()[source]¶ Returns the name of a Column as a sting
Example: >>> df.a.smvGetColName()
Returns: (str)
-
smvHour
()[source]¶ Extract hour component from a timestamp
Example
>>> df.select(col("dob").smvHour())
Returns: IntegerType. Hour component as integer, or null if input column is null Return type: (Column)
-
smvIsAllIn
(*vals)[source]¶ Returns true if ALL of the Array columns’ elements are in the given parameter sequence
Parameters: vals (*any) – vals must be of the same type as the Array content Example
input DF:
k v a b c d >>> df.select(array(col("k"), col("v")).smvIsAllIn("a", "b", "c").alias("isFound"))
output DF:
isFound true false false Returns: BooleanType Return type: (Column)
-
smvIsAnyIn
(*vals)[source]¶ Returns true if ANY one of the Array columns’ elements are in the given parameter sequence
Parameters: vals (*any) – vals must be of the same type as the Array content Example
input DF:
k v a b c d >>> df.select(array(col("k"), col("v")).smvIsAnyIn("a", "b", "c").alias("isFound"))
output DF:
isFound true true false Returns: BooleanType Return type: (Column)
-
smvMonth
()[source]¶ Extract month component from a timestamp
Example
>>> df.select(col("dob").smvMonth())
Returns: IntegerType. Month component as integer, or null if input column is null Return type: (Column)
-
smvMonth70
()[source]¶ Convert a Timestamp to the number of months from 1970-01-01
Example
>>> df.select(col("dob").smvMonth70())
Returns: IntegerType. Number of months from 1970-01-01 (start from 0) Return type: (Column)
-
smvPlusDays
(delta)[source]¶ Add N days to Timestamp or Date column
Parameters: delta (int or Column) – the number of days to add Example
>>> df.select(col("dob").smvPlusDays(3))
Returns: - TimestampType. The incremented Timestamp, or null if input is null.
- Note even if the input is DateType, the output is TimestampType
Return type: (Column) Please note that although Spark’s date_add function does the similar thing, they are actually different.
- Both can act on both Timestamp and Date types
- smvPlusDays always returns Timestamp, while F.date_add always returns Date
-
smvPlusMonths
(delta)[source]¶ Add N months to Timestamp or Date column
Parameters: delta (int or Column) – the number of months to add Note
The calculation will do its best to only change the month field retaining the same day of month. However, in certain circumstances, it may be necessary to alter smaller fields. For example, 2007-03-31 plus one month cannot result in 2007-04-31, so the day of month is adjusted to 2007-04-30.
Example
>>> df.select(col("dob").smvPlusMonths(3))
Returns: - TimestampType. The incremented Timestamp, or null if input is null.
- Note even if the input is DateType, the output is TimestampType
Return type: (Column) Please note that although Spark’s add_months function does the similar thing, they are actually different.
- Both can act on both Timestamp and Date types
- smvPlusMonths always returns Timestamp, while F.add_months always returns Date
-
smvPlusWeeks
(delta)[source]¶ Add N weeks to Timestamp or Date column
Parameters: delta (int or Column) – the number of weeks to add Example
>>> df.select(col("dob").smvPlusWeeks(3))
Returns: - TimestampType. The incremented Timestamp, or null if input is null.
- Note even if the input is DateType, the output is TimestampType
Return type: (Column)
-
smvPlusYears
(delta)[source]¶ Add N years to Timestamp or Date column
Parameters: delta (int or Column) – the number of years to add Example
>>> df.select(col("dob").smvPlusYears(3))
Returns: - TimestampType. The incremented Timestamp, or null if input is null.
- Note even if the input is DateType, the output is TimestampType
Return type: (Column)
-
smvQuarter
()[source]¶ Extract quarter component from a timestamp
Example
>>> df.select(col("dob").smvQuarter())
Returns: IntegerType. Quarter component as integer (1-based), or null if input column is null Return type: (Column)
-
smvStrToTimestamp
(fmt)[source]¶ Build a timestamp from a string
Parameters: fmt (string) – the format is the same as the Java Date format Example
>>> df.select(col("dob").smvStrToTimestamp("yyyy-MM-dd"))
Returns: TimestampType. The converted Timestamp Return type: (Column)
-
smvTimeToIndex
()[source]¶ smvTime helper to convert smvTime column to time index integer
Example smvTime values (as String): “Q201301”, “M201512”, “D20141201” Example output 172, 551, 16405 (# of quarters, months, and days from 19700101)
-
smvTimeToLabel
()[source]¶ smvTime helper to convert smvTime column to human readable form
Example smvTime values (as String): “Q201301”, “M201512”, “D20141201” Example output “2013-Q1”, “2015-12”, “2014-12-01”
-
smvTimeToTimestamp
()[source]¶ smvTime helper to convert smvTime column to a timestamp at the beginning of the given time pireod.
Example smvTime values (as String): “Q201301”, “M201512”, “D20141201” Example output “2013-01-01 00:00:00.0”, “2015-12-01 00:00:00.0”, “2014-12-01 00:00:00.0”
-
smvTimeToType
()[source]¶ smvTime helper to convert smvTime column to time type string
Example smvTime values (as String): “Q201301”, “M201512”, “D20141201” Example output type “quarter”, “month”, “day”
-
smvTimestampToStr
(timezone, fmt)[source]¶ Build a string from a timestamp and timezone
Parameters: - timezone (string or Column) – the timezone follows the rules in https://www.joda.org/joda-time/apidocs/org/joda/time/DateTimeZone.html#forID-java.lang.String- It can be a string like “America/Los_Angeles” or “+1000”. If it is null, use current system time zone.
- fmt (string) – the format is the same as the Java Date format
Example
>>> df.select(col("ts").smvTimestampToStr("America/Los_Angeles","yyyy-MM-dd HH:mm:ss"))
Returns: StringType. The converted String with given format Return type: (Column)
-
-
class
smv.helpers.
DataFrameHelper
(df)[source]¶ Bases:
object
-
peek
(pos=1, colRegex='.*')[source]¶ Display a DataFrame row in transposed view
Parameters: - pos (integer) – the n-th row to display, default as 1
- colRegex (string) – show the columns with name matching the regex, default as “.*”
Returns: (None)
-
peekSave
(path, pos=1, colRegex='.*')[source]¶ Write peek result to a file
Parameters: - path (string) – local file name to Write
- pos (integer) – the n-th row to display, default as 1
- colRegex (string) – show the columns with name matching the regex, default as “.*”
Returns: (None)
-
selectByLabel
(labels=None)[source]¶ Select columns whose metadata contains the specified labels
If the labels is empty, returns a DataFrame with all the unlabeled columns. Will throw if there are no columns that satisfy the condition.
Parameters: labels – (list(string)) a list of label strings for the columns to match Example
>>> df.selectByLabel(["tag_1"])
Returns: the DataFrame with the selected columns Return type: (DataFrame)
-
smvBinHist
(*colWithBin)[source]¶ Print distributions on numerical columns with applying the specified bin size
Parameters: colWithBin (*tuple) – each tuple must be of size 2, where the first element is the column name, and the second is the bin size for that column Example
>>> df.smvBinHist(("col_a", 1))
Returns: (None)
-
smvConcatHist
(*cols)[source]¶ Display EDD histogram of a group of columns (joint distribution)
Parameters: cols (*string) – The columns on which to perform EDD histogram Example
>>> df.smvConcatHist("a", "b")
Returns: (None)
-
smvCountHist
(keys, binSize)[source]¶ Print the distribution of the value frequency on specific columns
Parameters: - keys (list(string)) – the column names on which the EDD histogram is performed
- binSize (integer) – the bin size for the histogram
Example
>>> df.smvCountHist(["k"], 1)
Returns: (None)
-
smvDedupByKey
(*keys)[source]¶ Remove duplicate records from the DataFrame by arbitrarily selecting the first record from a set of records with same primary key or key combo.
Parameters: keys (*string or *Column) – the column names or Columns on which to apply dedup Example
input DataFrame:
id product Company 1 A C1 1 C C2 2 B C3 2 B C4 >>> df.smvDedupByKey("id")
output DataFrame:
id product Company 1 A C1 2 B C3 >>> df.smvDedupByKey("id", "product")
output DataFrame:
id product Company 1 A C1 1 C C2 2 B C3 Returns: a DataFrame without duplicates for the specified keys Return type: (DataFrame)
-
smvDedupByKeyWithOrder
(*keys)[source]¶ Remove duplicated records by selecting the first record relative to a given ordering
The order is specified in another set of parentheses, as follows:
>>> def smvDedupByKeyWithOrder(self, *keys)(*orderCols)
Note
Same as the smvDedupByKey method, we use RDD groupBy in the implementation of this method to make sure we can handle large key space.
Parameters: keys (*string or *Column) – the column names or Columns on which to apply dedup Example
input DataFrame:
id product Company 1 A C1 1 C C2 2 B C3 2 B C4 >>> df.smvDedupByKeyWithOrder(col("id"))(col("product").desc())
output DataFrame:
id product Company 1 C C2 2 B C3 Returns: a DataFrame without duplicates for the specified keys / order Return type: (DataFrame)
-
smvDesc
(*colDescs)[source]¶ Adds column descriptions
Parameters: colDescs (*tuple) – tuples of strings where the first is the column name, and the second is the description to add Example
>>> df.smvDesc(("a", "description of col a"), ("b", "description of col b"))
Returns: the DataFrame with column descriptions added Return type: (DataFrame)
-
smvDescFromDF
(descDF)[source]¶ Adds column descriptions
Parameters: descDF (DataFrame) – a companion 2-column desciptionDF that has variable names as column 1 and corresponding variable descriptions as column 2 Example
>>> df.smvDescFromDF(desciptionDF)
Returns: the DataFrame with column descriptions added Return type: (DataFrame)
-
smvDiscoverPK
(n=10000)[source]¶ Find a column combination which uniquely identifies a row from the data
The resulting output is printed out
Note
The algorithm only looks for a set of keys which uniquely identifies the row. There could be more key combinations which can also be the primary key.
Parameters: n (integer) – number of rows the PK discovery algorithm will run on, defaults to 10000 Example
>>> df.smvDiscoverPK(5000)
Returns: (None)
-
smvDumpDF
()[source]¶ Dump the schema and data of given df to screen for debugging purposes
Similar to show() method of DF from Spark 1.3, although the format is slightly different. This function’s format is more convenient for us and hence has remained.
Example
>>> df.smvDumpDF()
Returns: (None)
-
smvDupeCheck
(keys, n=10000)[source]¶ For a given list of potential keys, check for duplicated records with the number of duplications and all the columns.
Null values are allowed in the potential keys, so duplication on Null valued keys will also be reported.
Parameters: - keys (list(string)) – the key column list which the duplicate check applied
- n (integer) – number of rows from input data for checking duplications, defaults to 10000
Returns: - returns key columns + “_N” + the rest columns for the records with more key duplication records,
where “_N” has the count of duplications of the key values of that record
Return type: (DataFrame)
-
smvEdd
(*cols)[source]¶ Display EDD summary
Parameters: cols (*string) – column names on which to perform EDD summary Example
>>> df.smvEdd("a", "b")
Returns: (None)
-
smvEddCompare
(df2, ignoreColName)[source]¶ Compare 2 DFs by comparing their Edd Summary result
The result of the comparison is printed out.
Parameters: - df2 (DataFrame) – the DataFrame to compare with
- ignoreColName (boolean) – specifies whether to ignore column names, default is false
Example
>>> df.smvEddCompare(df2) >>> df.smvEddCompare(df2, true)
Returns: (None)
-
smvExpandStruct
(*cols)[source]¶ Expand structure type column to a group of columns
Parameters: cols (*string) – column names to expand Example
- input DF:
- [id: string, address: struct<state:string, zip:string, street:string>]
>>> df.smvExpandStruct("address")
- output DF:
- [id: string, state: string, zip: string, street: string]
Returns: DF with expanded columns Return type: (DataFrame)
-
smvExportCsv
(path, n=None)[source]¶ Export DataFrame to local file system
Parameters: - path (string) – relative path to the app running directory on local file system (instead of HDFS)
- n (integer) – optional. number of records to export. default is all records
Note
Since we have to collect the DF and then call JAVA file operations, the job have to be launched as either local or yar-client mode. Also it is user’s responsibility to make sure that the DF is small enough to fit into local file system.
Example
>>> df.smvExportCsv("./target/python-test-export-csv.csv")
Returns: (None)
-
smvFreqHist
(*cols)[source]¶ Print EDD histogram with frequency sorting
Parameters: cols (*string) – The columns on which to perform EDD histogram Example
>>> df.smvFreqHist("a")
Returns: (None)
-
smvGetDesc
(colName=None)[source]¶ Returns column description(s)
Parameters: colName (string) – optional column name for which to get the description. Example
>>> df.smvGetDesc("col_a") >>> df.smvGetDesc()
Returns: description string of colName, if specified Return type: (string) or:
Returns: a list of (colName, description) pairs for all columns Return type: (list(tuple))
-
smvGetLabel
(colName=None)[source]¶ Returns a list of column label(s)
Parameters: colName (string) – optional column name for which to get the label. Example
>>> df.smvGetLabel("col_a") >>> df.smvGetLabel()
Returns: a list of label strings of colName, if specified Return type: (list(string)) or:
Returns: a list of (colName, list(labels)) pairs for all columns Return type: (list(tuple))
-
smvGroupBy
(*cols)[source]¶ Similar to groupBy, instead of creating GroupedData, create an SmvGroupedData object.
See [[org.tresamigos.smv.SmvGroupedDataFunc]] for list of functions that can be applied to the grouped data.
Parameters: cols (*string or *Column) – column names or Column objects to group on Note
This is going away shortly and user will be able to use standard Spark groupBy method directly.
Example
>>> df.smvGroupBy(col("k")) >>> df.smvGroupBy("k")
Returns: grouped data object Return type: (SmvGroupedData)
-
smvHashSample
(key, rate=0.01, seed=23)[source]¶ Sample the df according to the hash of a column
MurmurHash3 algorithm is used for generating the hash
Parameters: - key (string or Column) – column name or Column to sample on
- rate (double) – sample rate in range (0, 1] with a default of 0.01 (1%)
- seed (int) – random generator integer seed with a default of 23
Example
>>> df.smvHashSample(col("key"), rate=0.1, seed=123)
Returns: sampled DF Return type: (DataFrame)
-
smvHist
(*cols)[source]¶ Display EDD histogram
Each column’s histogram prints separately
Parameters: cols (*string) – The columns on which to perform EDD histogram Example
>>> df.smvHist("a")
Returns: (None)
-
smvJoinByKey
(other, keys, joinType, isNullSafe=False)[source]¶ joins two DataFrames on a key
The Spark DataFrame join operation does not handle duplicate key names. If both left and right side of the join operation contain the same key, the result DataFrame is unusable.
The smvJoinByKey method will allow the user to join two DataFrames using the same join key. Post join, only the left side keys will remain. In case of outer-join, the coalesce(leftkey, rightkey) will replace the left key to be kept.
Parameters: - other (DataFrame) – the DataFrame to join with
- keys (list(string)) – a list of column names on which to apply the join
- joinType (string) – choose one of [‘inner’, ‘outer’, ‘leftouter’, ‘rightouter’, ‘leftsemi’]
- isNullSafe (boolean) – if true matches null keys between left and right tables and keep in output. Default False.
Example
>>> df1.smvJoinByKey(df2, ["k"], "inner") >>> df_with_null_key.smvJoinByKey(df2, ["k"], "inner", True)
Returns: result of the join operation Return type: (DataFrame)
-
smvJoinMultipleByKey
(keys, joinType='inner')[source]¶ Create multiple DF join builder
It is used in conjunction with joinWith and doJoin
Parameters: - keys (list(string)) – a list of column names on which to apply the join
- joinType (string) – choose one of [‘inner’, ‘outer’, ‘leftouter’, ‘rightouter’, ‘leftsemi’]
Example
>>> df.joinMultipleByKey(["k1", "k2"], "inner").joinWith(df2, "_df2").joinWith(df3, "_df3", "leftouter").doJoin()
Returns: the builder object for the multi join operation Return type: (SmvMultiJoin)
-
smvLabel
(colNames, labels)[source]¶ Adds labels to the specified columns
A column may have multiple labels. Adding the same label twice to a column has the same effect as adding that label once.
For multiple colNames, the same set of labels will be added to all of them. When colNames is empty, the set of labels will be added to all columns of the df. labels parameters must be non-empty.
Parameters: - labels – (list(string)) a list of label strings to add
- colNames – (list(string)) list of names of columns for which to add labels
Example
>>> df.smvLabel(["col_a", "col_b", "col_c"], ["tag_1", "tag_2"]) >>> df.smvLabel([], ["tag_1", "tag_2"])
Returns: the DataFrame with labels added to the specified columns Return type: (DataFrame) or:
Returns: the DataFrame with labels added to all columns Return type: (DataFrame)
-
smvOverlapCheck
(keyColName)[source]¶ For a set of DFs, which share the same key column, check the overlap across them
The other DataFrames are specified in another set of parentheses, as follows:
>>> df1.smvOverlapCheck(key)(*df)
Parameters: keyColName (string) – the column name for the key column Examples
>>> df1.smvOverlapCheck("key")(df2, df3, df4)
- output DF has 2 columns:
- key
- flag: a bit-string, e.g. 0110. Each bit represents whether the original DF has this key
It can be used with EDD to summarize on the flag:
>>> df1.smvOverlapCheck("key")(df2, df3).smvHist("flag")
Returns: the DataFrame with the key and flag columns Return type: (DataFrame)
-
smvPrefixFieldNames
(prefix)[source]¶ Apply a prefix to all column names in the given DataFrame.
Parameters: prefix (string) – prefix string to be added to all the column names Example
>>> df.smvPrefixFieldNames("x_")
Above will add x_ to the beginning of every column name in the DataFrame. Please note that if the renamed column names over lap with existing columns, the method will error out.
Returns: (DataFrame)
-
smvRemoveDesc
(*colNames)[source]¶ Removes description for the given columns from the Dataframe
Parameters: colNames (*string) – names of columns for which to remove the description Example
>>> df.smvRemoveDesc("col_a", "col_b") >>> df.smvRemoveDesc()
Returns: the DataFrame with column descriptions removed Return type: (DataFrame) or:
Returns: the DataFrae with all column descriptions removed Return type: (DataFrame)
-
smvRemoveLabel
(colNames=None, labels=None)[source]¶ Removes labels from the specified columns
For multiple colNames, the same set of labels will be removed from all of them. When colNames is empty, the set of labels will be removed from all columns of the df. When labels is empty, all labels will be removed from the given columns.
If neither columns nor labels are specified, i.e. both parameter lists are empty, then all labels are removed from all columns in the data frame, essentially clearing the label meta data.
Parameters: - labels – (list(string)) a list of label strings to remove
- colNames – (list(string)) list of names of columns for which to remove labels
Example
>>> df.smvRemoveLabel(["col_a"], ["tag_1"]) >>> df.smvRemoveLabel()
Returns: the DataFrame with specified labels removed from the specified columns Return type: (DataFrame) or:
Returns: the DataFrame with all label meta data cleared Return type: (DataFrame)
-
smvRenameField
(*namePairs)[source]¶ Rename one or more fields of a DataFrame
Parameters: namePairs (*tuple) – tuples of strings where the first is the source column name, and the second is the target column name Example
>>> df.smvRenameField(("a", "aa"), ("c", "cc"))
Returns: the DataFrame with renamed fields Return type: (DataFrame)
-
smvSelectMinus
(*cols)[source]¶ Remove one or more columns from current DataFrame
Parameters: cols (*string or *Column) – column names or Columns to remove from the DataFrame Example
>>> df.smvSelectMinus("col1", "col2") >>> df.smvSelectMinus(col("col1"), col("col2"))
Returns: the resulting DataFrame after removal of columns Return type: (DataFrame)
-
smvSelectPlus
(*cols)[source]¶ Selects all the current columns in current DataFrame plus the supplied expressions
The new columns are added to the end of the current column list.
Parameters: cols (*Column) – expressions to add to the DataFrame Example
>>> df.smvSelectPlus((col("price") * col("count")).alias("amt"))
Returns: the resulting DataFrame after removal of columns Return type: (DataFrame)
-
smvSkewJoinByKey
(other, joinType, skewVals, key)[source]¶ Join that leverages broadcast (map-side) join of rows with skewed (high-frequency) values
Rows keyed by skewed values are joined via broadcast join while remaining rows are joined without broadcast join. Occurrences of skewVals in df2 should be infrequent enough that the filtered table is small enough for a broadcast join. result is the union of the join results.
Parameters: - other (DataFrame) – DataFrame to join with
- joinType (str) – name of type of join (e.g. “inner”)
- skewVals (list(object)) – list of skewed values
- key (str) – key on which to join (also the Column with the skewed values)
Example
>>> df.smvSkewJoinByKey(df2, "inner", [4], "cid")
will broadcast join the rows of df1 and df2 where col(“cid”) == “4” and join the remaining rows of df1 and df2 without broadcast join.
Returns: the result of the join operation Return type: (DataFrame)
-
smvUnion
(*dfothers)[source]¶ Unions DataFrames with different number of columns by column name and schema
Spark unionAll ignores column names & schema, and can only be performed on tables with the same number of columns.
Parameters: dfOthers (*DataFrame) – the dataframes to union with Example
>>> df.smvUnion(df2, df3)
Returns: the union of all specified DataFrames Return type: (DataFrame)
-
smvUnpivot
(*cols)[source]¶ Unpivot the selected columns
Given a set of records with value columns, turns the value columns into value rows.
Parameters: cols (*string) – the names of the columns to unpivot Example
input DF:
id X Y Z 1 A B C 2 D E F 3 G H I >>> df.smvUnpivot("X", "Y", "Z")
output DF:
id column value 1 X A 1 Y B 1 Z C … … … 3 Y H 3 Z I Returns: the unpivoted DataFrame Return type: (DataFrame)
-
smvUnpivotRegex
(cols, colNameFn, indexColName)[source]¶ Unpivot the selected columns using the specified regex
Parameters: - cols (*string) – the names of the columns to unpivot
- colNameFn (string) – a regex representing the function to be applied when unpivoting
- indexColName (string) – the name of the index column to be created
Example
input DF:
id A_1 A_2 B_1 B_2 1 1_a_1 1_a_2 1_b_1 1_b_2 2 2_a_1 2_a_2 2_b_1 2_b_2 >>> df.smvUnpivotRegex( ["A_1", "A_2", "B_1", "B_2"], "(.*)_(.*)", "index" )
output DF:
id index A B 1 1 1_a_1 1_b_1 1 2 1_a_2 1_b_2 2 1 2_a_1 2_b_1 2 2 2_a_2 2_b_2 Returns: the unpivoted DataFrame Return type: (DataFrame)
-
smvWithLabel
(labels=None)[source]¶ Returns all column names in the data frame that contain all the specified labels
If the labels is empty, returns all unlabeled columns in the data frame. Will throw if there are no columns that satisfy the condition.
Parameters: labels – (list(string)) a list of label strings for the columns to match Example
>>> df.smvWithLabel(["tag_1", "tag_2"])
Returns: a list of column name strings that match the specified labels Return type: (list(string))
-
topNValsByFreq
(n, col)[source]¶ Get top N most frequent values in Column col
Parameters: - n (int) – maximum number of values
- col (Column) – which column to get values from
Example
>>> df.topNValsByFreq(1, col("cid"))
will return the single most frequent value in the cid column
Returns: most frequent values (type depends on schema) Return type: (list(object))
-
-
class
smv.helpers.
SmvGroupedData
(df, keys, sgd)[source]¶ Bases:
object
The result of running smvGroupBy on a DataFrame. Implements special SMV aggregations.
-
smvDecile
(value_cols, ignoreNull=True)[source]¶ Compute deciles of some columns on a grouped data
Simply an alias to smvQuantile(value_cols, 10, ignoreNull)
Parameters: - value_cols (list(str)) – columns to calculate deciles on
- ignoreNull (boolean) – if true, null values’s percent ranks will be nulls, otherwise, as Spark sort considers null smaller than any value, nulls percent ranks will be zeros. Default true.
-
smvFillNullWithPrevValue
(*orderCols)[source]¶ Fill in Null values with “previous” value according to an ordering
Examples
Given DataFrame df representing the table
K T V a 1 null a 2 a a 3 b a 4 null we can use
>>> df.smvGroupBy("K").smvFillNullWithPrevValue($"T".asc)("V")
to produce the result
K T V a 1 null a 2 a a 3 b a 4 b Returns: result of fill nulls with previous value Return type: (Dataframe)
-
smvPercentRank
(value_cols, ignoreNull=True)[source]¶ Compute the percent rank of a sequence of columns within a group in a given DataFrame.
Used Spark’s percentRank window function. The precent rank is defined as R/(N-1), where R is the base 0 rank, and N is the population size. Under this definition, min value (R=0) has percent rank 0.0, and max value has percent rank 1.0.
For each column for which the percent rank is computed (e.g. “v”), an additional column is added to the output, v_pctrnk
All other columns in the input are untouched and propagated to the output.
Parameters: - value_cols (list(str)) – columns to calculate percentRank on
- ignoreNull (boolean) – if true, null values’s percent ranks will be nulls, otherwise, as Spark sort considers null smaller than any value, nulls percent ranks will be zeros. Default true.
Example
>>> df.smvGroupBy('g, 'g2).smvPercentRank(["v1", "v2", "v3"])
-
smvPivot
(pivotCols, valueCols, baseOutput)[source]¶ smvPivot adds the pivoted columns without additional aggregation. In other words N records in, N records out
SmvPivot for Pivot Operations:
Pivot Operation on DataFrame that transforms multiple rows per key into a columns so they can be aggregated to a single row for a given key while preserving all the data variance.
Example
For input data below:
id month product count 1 5/14 A 100 1 6/14 B 200 1 5/14 B 300 We would like to generate a data set to be ready for aggregations. The desired output is:
id count_5_14_A count_5_14_B count_6_14_A count_6_14_B 1 100 NULL NULL NULL 1 NULL NULL NULL 200 1 NULL 300 NULL NULL - The raw input is divided into three parts.
- key column: part of the primary key that is preserved in the output.
- That would be the id column in the above example.
- pivot columns: the columns whose row values will become the new column names.
- The cross product of all unique values for each column is used to generate the output column names.
- value column: the value that will be copied/aggregated to corresponding output
- column. count in our example.
Example code:
>>> df.smvGroupBy("id").smvPivot([["month", "product"]], ["count"], ["5_14_A", "5_14_B", "6_14_A", "6_14_B"])
Parameters: - pivotCols (list(list(str))) – specify the pivot columns, on above example, it is [[‘month, ‘product]]. If [[‘month’], [‘month’, ‘product’]] is used, the output columns will have “count_5_14” and “count_6_14” as addition to the example.
- valueCols (list(string)) – names of value columns which will be prepared for aggregation
- baseOutput (list(str)) – expected names of the pivoted column In above example, it is [“5_14_A”, “5_14_B”, “6_14_A”, “6_14_B”]. The user is required to supply the list of expected pivot column output names to avoid and extra action on the input DataFrame just to extract the possible pivot columns. If an empty sequence is provided, the base output columns will be extracted from values in the pivot columns (will cause an action on the entire DataFrame!)
Returns: result of pivot operation
Return type: (DataFrame)
-
smvPivotCoalesce
(pivotCols, valueCols, baseOutput)[source]¶ Perform SmvPivot, then coalesce the output Please refer smvPivot’s document for context and details of the SmvPivot operation.
Parameters: - pivotCols (list(list(str))) – list of lists of column names to pivot
- valueCols (list(string)) – names of value columns to coalesce
- baseOutput (list(str)) – expected names pivoted column
Examples
For example, given a DataFrame df that represents the table
k p v a c 1 a d 2 a e null a f 5 we can use
>>> df.smvGroupBy("k").smvPivotCoalesce([['p']], ['v'], ['c', 'd', 'e', 'f'])
to produce the following output
k v_c v_d v_e v_f a 1 2 null 5 Returns: result of pivot coalesce Return type: (Dataframe)
-
smvPivotSum
(pivotCols, valueCols, baseOutput)[source]¶ Perform SmvPivot, then sum the results. Please refer smvPivot’s document for context and details of the SmvPivot operation.
Parameters: - pivotCols (list(list(str))) – list of lists of column names to pivot
- valueCols (list(string)) – names of value columns to sum
- baseOutput (list(str)) – expected names pivoted column
Examples
For example, given a DataFrame df that represents the table
id month product count 1 5/14 A 100 1 6/14 B 200 1 5/14 B 300 we can use
>>> df.smvGroupBy("id").smvPivotSum([["month", "product"]], ["count"], ["5_14_A", "5_14_B", "6_14_A", "6_14_B"])
to produce the following output
id count_5_14_A count_5_14_B count_6_14_A count_6_14_B 1 100 300 NULL 200 Returns: result of pivot sum Return type: (DataFrame)
-
smvQuantile
(value_cols, bin_num, ignoreNull=True)[source]¶ Compute the quantile bin numbers within a group in a given DataFrame.
Estimate quantiles and quantile groups given a data with unknown distribution is quite arbitrary. There are multiple ‘definitions’ used in different softwares. Please refer https://en.wikipedia.org/wiki/Quantile#Estimating_quantiles_from_a_sample for details.
smvQuantile calculated from Spark’s percentRank. The algorithm is equavalent to the one labled as R-7, Excel, SciPy-(1,1), Maple-6 in above wikipedia page. Please note it is slight different from SAS’s default algorithm (labled as SAS-5).
Returned quantile bin numbers are 1 based. For example when bin_num=10, returned values are integers from 1 to 10, inclusively.
For each column for which the quantile is computed (e.g. “v”), an additional column is added to the output, “v_quantile”.
All other columns in the input are untouched and propagated to the output.
Parameters: - value_cols (list(str)) – columns to calculate quantiles on
- bin_num (int) – number of quantiles, e.g. 4 for quartile, 10 for decile
- ignoreNull (boolean) – if true, null values’s percent ranks will be nulls, otherwise, as Spark sort considers null smaller than any value, nulls percent ranks will be zeros. Default true.
Example
>>> df.smvGroupBy('g, 'g2).smvQuantile(["v"], 100)
-
smvRePartition
(numParts)[source]¶ Repartition SmvGroupedData using specified partitioner on the keys. A HashPartitioner with the specified number of partitions will be used.
This method is used in the cases that the key-space is very large. In the current Spark DF’s groupBy method, the entire key-space is actually loaded into executor’s memory, which is very dangerous when the key space is big. The regular DF’s repartition function doesn’t solve this issue since a random repartition will not guaranteed to reduce the key-space on each executor. In that case we need to use this function to linearly reduce the key-space.
Example:
>>> df.smvGroupBy("k1", "k2").smvRePartition(32).agg(sum("v") as "v")
-
smvTimePanelAgg
(time_col, start, end)[source]¶ Apply aggregation on given keys and specified time panel period
Parameters: - time_col (str) – the column name in the data as the event timestamp
- start (panel.PartialTime) – could be Day, Month, Week, Quarter, refer the panel package for details
- end (panel.PartialTime) – should be the same time type as the “start”
- addMissingTimeWithNull (default true) – fill null records
Both start and end PartialTime are inclusive.
Example:
>>> df.smvGroupBy("K").smvTimePanelAgg("TS", Week(2012, 1, 1), Week(2012, 12, 31))( sum("V").alias("V") )
Returns: - a result data frame with keys, and a column with name smvTime, and
- aggregated values. Refer the panel package for the potential forms of different PartialTimes
Return type: (DataFrame) When addMissingTimeWithNull is true, the aggregation should be always on the variables instead of on literals (should NOT be count(lit(1))).
Example with on data:
Given DataFrame df as
K TS V 1 2012-01-01 1.5 1 2012-03-01 4.5 1 2012-07-01 7.5 1 2012-05-01 2.45 after applying
>>> import smv.panel as P >>> df.smvGroupBy("K").smvTimePanelAgg("TS", P.Quarter(2012, 1), P.Quarter(2012, 2))( sum("V").alias("V") )
the result is
K smvTime V 1 Q201201 6.0 1 Q201202 2.45
-
smvTopNRecs
(maxElems, *cols)[source]¶ For each group, return the top N records according to a given ordering
Example
>>> df.smvGroupBy("id").smvTopNRecs(3, col("amt").desc())
This will keep the 3 largest amt records for each id
Parameters: - maxElems (int) – maximum number of records per group
- cols (*str) – columns defining the ordering
Returns: result of taking top records from groups
Return type: (DataFrame)
-
smvWithTimePanel
(time_col, start, end)[source]¶ Adding a specified time panel period to the DF
Parameters: - time_col (str) – the column name in the data as the event timestamp
- start (panel.PartialTime) – could be Day, Month, Week, Quarter, refer the panel package for details
- end (panel.PartialTime) – should be the same time type as the “start”
- addMissingTimeWithNull (boolean) – Default True. when some PartialTime is missing whether to fill null records
Returns: - a result data frame with keys, and a column with name smvTime, and
other input columns. Refer the panel package for the potential forms of different PartialTimes.
Return type: (DataFrame)
Note
Since TimePanel defines a period of time, if for some group in the data there are missing Months (or Quarters), when addMissingTimeWithNull is true, this function will add records with non-null keys and all possible smvTime columns with all other columns null-valued.
Example
Given DataFrame df as
K TS V 1 2014-01-01 1.2 1 2014-03-01 4.5 1 2014-03-25 10.3 after applying
>>> import smv.panel as P >>> df.smvGroupBy("k").smvWithTimePanel("TS", P.Month(2014,1), P.Month(2014, 3))
the result is
K TS V smvTime 1 2014-01-01 1.2 M201401 1 2014-03-01 4.5 M201403 1 2014-03-25 10.3 M201403 1 None None M201401 1 None None M201402 1 None None M201403
-
-
class
smv.helpers.
SmvMultiJoin
(sqlContext, mj)[source]¶ Bases:
object
Wrapper around Scala’s SmvMultiJoin
-
doJoin
(dropextra=False)[source]¶ Trigger the join operation
Parameters: dropExtra (boolean) – default false, which will keep all duplicated name columns with the postfix. when true, the duplicated columns will be dropped Example
>>> joindf.doJoin()
Returns: result of executing the join operation Return type: (DataFrame)
-
joinWith
(df, postfix, jointype=None)[source]¶ Append SmvMultiJoin Chain
Parameters: - df (DataFrame) – the DataFrame to join with
- postfix (string) – postfix to use when renaming join columns
- jointype (string) – optional jointype. if not specified, conf.defaultJoinType is used. choose one of [‘inner’, ‘outer’, ‘leftouter’, ‘rightouter’, ‘leftsemi’]
Example
>>> joindf = df1.smvJoinMultipleByKey(['a'], 'inner').joinWith(df2, '_df2').joinWith(df3, '_df3', 'outer')
Returns: formula of the join. need to call doJoin() on it to execute Return type: (SmvMultiJoin)
-
smv.historical_validators module¶
-
class
smv.historical_validators.
SmvHistoricalValidator
(*args)[source]¶ Bases:
object
Base of all user defined historical validator rules.
Derived classes must override the metadata`, and validateMetadata methods.
-
smv.historical_validators.
SmvHistoricalValidators
(*validators)[source]¶ Decorator to specify set of validators to apply to an SmvGenericModule.
Each validator should specify the standard metadata and validateMetadata methods. In addition, each validator will have a unique key defined that will determine how it is stored in the final metadata structure. This decorator will take care of composing the union of all metadata from validators and decomposing the full metadata structure into individual pieces for each validator (hint: the _key() method of each validator is used to key into the final metadata structure)
smv.jprops module¶
-
smv.jprops.
iter_properties
(fh, comments=False)[source]¶ Incrementally read properties from a Java .properties file.
Yields tuples of key/value pairs.
If
comments
is True, comments will be included withjprops.COMMENT
in place of the key.Parameters: - fh – a readable file-like object
- comments – should include comments (default: False)
-
smv.jprops.
load_properties
(fh, mapping=<class 'dict'>)[source]¶ Reads properties from a Java .properties file.
Returns a dict (or provided mapping) of properties.
Parameters: - fh – a readable file-like object
- mapping – mapping type to load properties into
-
smv.jprops.
store_properties
(fh, props, comment=None, timestamp=True)[source]¶ Writes properties to the file in Java properties format.
Parameters: - fh – a writable file-like object
- props – a mapping (dict) or iterable of key/value pairs
- comment – comment to write to the beginning of the file
- timestamp – boolean indicating whether to write a timestamp comment
-
smv.jprops.
write_comment
(fh, comment)[source]¶ Writes a comment to the file in Java properties format.
Newlines in the comment text are automatically turned into a continuation of the comment by adding a “#” to the beginning of each line.
Parameters: - fh – a writable file-like object
- comment – comment string to write
smv.matcher module¶
-
smv.matcher.
ExactLogic
(colName, expr)[source]¶ Level match with exact logic
Parameters: - colName (string) – level name used in the output DF
- expr (Column) – match logic colName
Example
>>> ExactLogic("First_Name_Match", col("first_name") == col("_first_name"))
Returns: (ExactLogic)
-
smv.matcher.
ExactMatchPreFilter
(colName, expr)[source]¶ Specify the top-level exact match
Parameters: - colName (string) – level name used in the output DF
- expr (Column) – match logic condition Column
Example
>>> ExactMatchPreFilter("Full_Name_Match", col("full_name") == col("_full_name"))
Returns: (ExactMatchPreFilter)
-
smv.matcher.
FuzzyLogic
(colName, predicate, valueExpr, threshold)[source]¶ Level match with fuzzy logic
Parameters: - colName (string) – level name used in the output DF
- predicate (Column) – a condition column, no match if this condition evaluates as false
- valueExpr (Column) – a value column, which typically return a score. Higher score means higher chance of matching
- threshold (float) – No match if the evaluated valueExpr < threshold
Example
>>> FuzzyLogic("Levenshtein_City", lit(True), normlevenshtein(col("city"),col("_city")), 0.9)
Returns: (FuzzyLogic)
-
smv.matcher.
GroupCondition
(expr)[source]¶ Specify the shared matching condition of all the levels (except the top-level exact match)
Parameters: expr (Column) – shared matching condition Note
expr should be in “left == right” form so that it can really optimize the process by reducing searching space
Example
>>> GroupCondition(soundex("first_name") == soundex("_first_name"))
Returns: (GroupCondition)
-
class
smv.matcher.
SmvEntityMatcher
(leftId, rightId, exactMatchFilter, groupCondition, levelLogics)[source]¶ Bases:
object
Perform multiple level entity matching with exact and/or fuzzy logic
Parameters: - leftId (string) – id column name of left DF (df1)
- rightId (string) – id column name of right DF (df2)
- exactMatchFilter (ExactMatchPreFilter) – exact match condition, if records matched no further tests will be performed
- groupCondition (GroupCondition) – for exact match leftovers, a deterministic condition to narrow down the search space
- levelLogics (list(ExactLogic or FuzzyLogic)) – A list of level match conditions (always weaker than exactMatchFilter), all of them will be tested
Example
code:
SmvEntityMatcher("id", "_id", ExactMatchPreFilter("Full_Name_Match", col("full_name") == col("_full_name")), GroupCondition(soundex("first_name") == soundex("_first_name")), [ ExactLogic("First_Name_Match", col("first_name") == col("_first_name")), FuzzyLogic("Levenshtein_City", lit(True), normlevenshtein(col("city"),col("_city")), 0.9) ] )
-
doMatch
(df1, df2, keepOriginalCols=True)[source]¶ Apply SmvEntityMatcher to the 2 DataFrames
Parameters: - df1 (DataFrame) – DataFrame 1 with an id column with name “id”
- df2 (DataFrame) – DataFrame 2 with an id column with name “id”
- keepOriginalCols (boolean) – whether to keep all input columns of df1 and df2, defaults to true
Example
code:
SmvEntityMatcher("id", "_id", ExactMatchPreFilter("Full_Name_Match", col("full_name") == col("_full_name")), GroupCondition(soundex("first_name") == soundex("_first_name")), [ ExactLogic("First_Name_Match", col("first_name") == col("_first_name")), FuzzyLogic("Levenshtein_City", lit(True), normlevenshtein(col("city"),col("_city")), 0.9) ] ).doMatch(df1, df2, False)
Returns: a DataFrame with df1’s id and df2’s id and match flags of all the levels. For levels with fuzzy logic, the matching score is also provided. A column named “MatchBitmap” also provided to summarize all the matching flags. When keepOriginalCols is true, input columns are also kept Return type: (DataFrame)
smv.modulesvisitor module¶
smv.panel module¶
SMV PartialTime and TimePanel Framework
This module defines the most used PartialTime including Day, Month, Week, Quarter, and the TimePanel
- PartialTime is the base class of
- Day
- Month
- Week
- Quarter
- PartialTime has the following methods
- smvTime(): returns (str), will be the value of the smvTime column if added to a DF
- timeIndex(): returns (int), an integer incremental by one as the PartialTime increase
- by one unit
- timeLabel(): returns (str), a human readable string
- timeType(): returns (str), the type of the PartialTime
-
smv.panel.
Day
(year, month, day)[source]¶ Define an smv.panel.Day
Day extends smv.panel.PartialTime base class
Parameters: - year (int) –
- month (int) –
- day (int) –
Example
>>> d = Day(2012, 5, 31) >>> d.smvTime() u'D20120531' >>> d.timeIndex() 15491 >>> d.timeLabel() u'2012-05-31' >>> d.timeType() u'day'
Returns: (java object smv.panel.Day)
-
smv.panel.
Month
(year, month)[source]¶ Define an smv.panel.Month
Month extends smv.panel.PartialTime base class
Parameters: - year (int) –
- month (int) –
Example
>>> m = Month(2012, 5) >>> m.smvTime() u'M201205' >>> m.timeIndex() 508 >>> m.timeLabel() u'2012-05' >>> m.timeType() u'month'
Returns: (java object smv.panel.Month)
-
smv.panel.
Quarter
(year, quarter)[source]¶ Define an smv.panel.Quarter
Quarter extends smv.panel.PartialTime base class
Parameters: - year (int) –
- quarter (int) –
Example
>>> q = Quarter(2012, 1) >>> q.smvTime() u'Q201201' >>> q.timeIndex() 168 >>> q.timeLabel() u'2012-Q1' >>> q.timeType() u'quarter'
Returns: (java object smv.panel.Quarter)
-
smv.panel.
TimePanel
(start, end)[source]¶ Define an smv.panel.TimePanel
TimePanel is a consecutive range of PartialTimes It has a “start” PartialTime and “end” PartialTime, both are inclusive. “start” and “end” have to have the same timeTypeParameters: - start (java object smv.PartialTime) – Quarter, Month, Day etc.
- end (java object smv.PartialTime) – Quarter, Month, Day etc.
Example
>>> tp = TimePanel(Day(2012, 1, 1), Day(2013, 12, 31))
Returns: (java object smv.panel.TimePanel)
-
smv.panel.
Week
(year, month, day, start_on='Monday')[source]¶ Define an smv.panel.Week
Week extends smv.panel.PartialTime base class
Parameters: - year (int) –
- month (int) –
- day (int) –
- start_on (str) – Week starts on, valid values: Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Sunday. Default value is Monday
Example
>>> w = Week(2012, 3, 4) >>> w.smvTime() u'W20120227' >>> w.timeIndex() 2200 >>> w.timeLabel() u'Week of 2012-02-27' >>> w.timeType() u'week'
>>> w = Week(2012, 3, 4, "Sunday") >>> w.timeType() u'week_start_on_Sunday' >>> w.smvTime() u'W(7)20120304' >>> w.timeIndex() 2201 >>> w.timeLabel() u'Week of 2012-03-04'
Returns: (java object smv.panel.Week)
smv.provider module¶
SMV Provider API
This module allows user to declare/register providers.
-
class
smv.provider.
SmvProvider
[source]¶ Bases:
object
Base class of all provider classes.
Each provider must inherit from this class and also define provider_type() static method that returns the type as a string.
Note: there is no version agnostic way to enforce derived classes to implement a static provider_type() so the check will be done dynamically (runtime)
-
IS_PROVIDER
= True¶
-
classmethod
provider_type_fqn
()[source]¶ create a hierarchichal provider type fqn for a given provider class based on the provider class hierarchy.
- Example (assume provider_type() for class X is X):
- class A(SmvProvider) class B(A) class C(B)
In the above example, C’s provider type is just C but C’s provider_type_fqn is “A.B.C”
-
smv.runconfig module¶
SMV User Run Configuration Parameters
This module defined the SmvRunConfig class which can be mixed-in into an SmvModule to get user configuration parameters at run-time.
-
class
smv.runconfig.
SmvRunConfig
[source]¶ Bases:
object
DEPRECATED
Run config accessor methods have been absorbed by SmvGenericModule, so SmvRunConfig is maintained to support existing projects. SmvRunConfig’s influence on the dataset hash is preserved so that modules do not have to transition overnight to using SmvGenericModule.requiresConfig in order for the config to influence the dataset hash.
smv.runinfo module¶
Easy Python access to SmvRunInfoCollector and related Scala classes.
Todo
- document example use
-
class
smv.runinfo.
SmvRunInfo
(meta, metahist)[source]¶ Bases:
object
collection of a module’s running info with:
- metadata
- metahistory
-
class
smv.runinfo.
SmvRunInfoCollector
[source]¶ Bases:
object
A list of SmvRunInfos from a run transaction, and methods to help reporting on them
-
dqm_state
(ds_name)[source]¶ Returns the DQM state for a given dataset
Returns: A dictionary representation of the dqm state Raises: py4j.protocol.Py4JError
– if there is java call error or there is no validation result or dqm state for the specified dataset (e.g. caused by a typo in the name)
-
dqm_validation
(ds_name)[source]¶ Returns the DQM validation result for a given dataset
Returns: A dictionary representation of the dqm validation result Raises: py4j.protocol.Py4JError
– if there is java call error or there is no validation result for the specified dataset (e.g. caused by a typo in the name)
-
smv.schema_meta_ops module¶
SMV Schema Meta Operations
Provides helper functions for SmvDesc and SmvLabel operations
-
class
smv.schema_meta_ops.
SchemaMetaOps
(df)[source]¶ Bases:
object
-
addLabel
(colNames, labels)[source]¶ Adds labels to the specified columns
If colNames are empty, adds the same set of labels to all columns
-
colsWithLabel
(labels=None)[source]¶ Returns all column names in the data frame that contain all the specified labels
If labels are empty, returns names of unlabeled columns
-
getDesc
(colName)[source]¶ Returns column description(s)
If colName is empty, returns descriptions for all columns
-
getLabel
(colName)[source]¶ Returns a list of column label(s)
If colName is empty, returns labels for all columns
-
smv.smvapp module¶
SmvApp entry class
This module provides the main SMV Python entry point SmvPy
class and a singleton smvApp.
It is equivalent to SmvApp
on Scala side
-
class
smv.smvapp.
SmvApp
(arglist, _sparkSession, py_module_hotload=True)[source]¶ Bases:
object
The Python representation of SMV.
Its singleton instance is created later in the containing module and is named smvApp
Adds java_imports to the namespace in the JVM gateway in SparkContext (in pyspark). It also creates an instance of SmvPyClient.
-
SEMI_PRIVATE_LIB_PREFIX
= 'dsalib'¶
-
SRC_LIB_PATH
= 'library'¶
-
SRC_PROJECT_PATH
= 'src/main/python'¶
-
all_data_dirs
()[source]¶ All the config data dirs as an object. Could be dynamic, so calculate each time when use
-
copyToHdfs
(fileobj, destination)[source]¶ Copies the content of a file object to an HDFS location.
Parameters: - fileobj (file object) – a file-like object whose content is to be copied, such as one returned by open(), or StringIO
- destination (str) – specifies the destination path in the hadoop file system
The file object is expected to have been opened in binary read mode.
The file object is closed when this function completes.
-
classmethod
createInstance
(arglist, _sparkSession, py_module_hotload=True)[source]¶ Create singleton instance. Also returns the instance.
-
discoverSchemaAsSmvSchema
(path, csvAttributes, n=100000)[source]¶ Discovers the schema of a .csv file and returns a Scala SmvSchema instance
path — path to csvfile n — number of records used to discover schema (optional) csvAttributes — Scala CsvAttributes instance (optional)
-
exception_handling
()[source]¶ Decorator function to catch Py4JJavaError and raise SmvDqmValidationError if any. Otherwise just pass through the original exception
-
getCurrentProperties
()[source]¶ Python dict of current megred props defaultProps ++ appConfProps ++ homeConfProps ++ usrConfProps ++ cmdLineProps ++ dynamicRunConfig Where right wins out in map merge.
-
getDsHash
(name)[source]¶ The current hashOfHash for the named module as a hex string
Parameters: - name (str) – The uniquen name of a module. Does not have to be the FQN.
- runConfig (dict) – runConfig to apply when collecting info. If module was run with a config, the same config needs to be specified here to retrieve the correct hash.
Returns: The hashOfHash of the named module
Return type: (str)
-
getModuleResult
(fqn, forceRun=False)[source]¶ Run module and get its result, which may not be a DataFrame
-
getRunInfo
(fqn)[source]¶ Returns the run information of a module and all its dependencies from the last run.
Unlike the runModule() method, which returns the run information just for that run, this method returns the run information from the last run.
If no module was run (e.g. the code did not change, so the data is read from persistent storage), the SmRunInfoCollector returned from the runModule() method would be empty. But the SmvRunInfoCollector returned from this method would contain all latest run information about all dependent modules.
Parameters: - fqn (str) – fqn of target module
- runConfig (dict) – runConfig to apply when collecting info. If module was run with a config, the same config needs to be specified here to retrieve the info.
Returns: SmvRunInfoCollector
-
getRunInfoByPartialName
(name)[source]¶ Returns the run information of a module and all its dependencies from the last run.
Unlike the runModule() method, which returns the run information just for that run, this method returns the run information from the last run.
If no module was run (e.g. the code did not change, so the data is read from persistent storage), the SmRunInfoCollector returned from the runModule() method would be empty. But the SmvRunInfoCollector returned from this method would contain all latest run information about all dependent modules.
Parameters: name (str) – unique suffix to fqn of target module Returns: SmvRunInfoCollector
-
getSchemaByDataFileAsSmvSchema
(data_file_name, path=None)[source]¶ Get the schema of a data file from its path and returns a Scala SmvSchema instance. The result will be None if the corresponding schema file does not exist or is invalid.
-
get_module_state_json
(fqns)[source]¶ Generate a json string for modules’ needToRun state of the app
Parameters: fqns (list(string)) – module fqn list to get state for Returns: json string. E.g. {“stage.mymod”: {“needsToRun” : True}} Return type: (string)
-
get_need_to_run
(roots, keep_roots=False)[source]¶ Given a list of target modules to run, return a list of modules which will run and be persisted in the order of how they should run. This is a sub-set of modules_needed_for_run, but only keep the non-ephemeral and not-persisted-yet modules. Please note that some of the roots may not be in this list, to include all roots, set keep_roots to True
-
get_providers_by_prefix
(fqn_prefix)[source]¶ Discover all providers in user lib and smv lib that have an provider type fqn starting with the given prefix.
-
smv.smvappinfo module¶
-
class
smv.smvappinfo.
SmvAppInfo
(smvApp)[source]¶ Bases:
object
Provides SmvApp module list, dependency graph etc. for shell and plot. This class is mainly for CLI and GUI. The functions are not core to SmvApp.
smv.smvconfig module¶
Handle command line args and props files
-
class
smv.smvconfig.
SmvConfig
(arglist, _jvm)[source]¶ Bases:
object
Smv configurations Including:
- command line parsing
- read in property files
- dynamic configuration handling
-
df_persist_format
()[source]¶ Spark DF’s default persisted format. Available values:
- smvcsv_on_hdfs
- parquet_on_hdfs (default)
-
get_run_config
(key)[source]¶ Run config will be accessed within client modules. Return run-config value of the given key.
- 2 possible sources of run-config:
- dynamic_props (which passed in by client code)
- props files/command-line parameters
-
set_app_dir
(new_app_dir)[source]¶ Dynamic reset of app dir, so that the location of app and user conf files. Re-read the props files
smv.smvdriver module¶
-
class
smv.smvdriver.
SmvDriver
[source]¶ Bases:
object
Driver for an SMV application
SmvDriver handles the boiler plate around parsing driver args, constructing an SmvApp, and running an application. To use SmvDriver, override main and in the main block of your driver script call construct your driver and call run.
-
create_smv_app
(smv_args, driver_args)[source]¶ Override this to define how this driver’s SmvApp is created
Default is just SmvApp.createInstance(smv_args). Note that it’s important to use createInstance to ensure that the singleton app is set.
SmvDriver will parse the full CLI args to distinguish the SMV args from from the args to your driver.
Parameters: - smv_args (list(str)) – CLI args for SMV - should be passed to SmvApp)
- driver_args (list(str)) – CLI args for the driver
-
smv.smvgenericmodule module¶
-
class
smv.smvgenericmodule.
SmvGenericModule
(smvApp)[source]¶ Bases:
abc.ABC
Abstract base class for all SMV modules, including dataset and task modules
-
IsSmvDataSet
= True¶
-
class
RunParams
(fqn2df)[source]¶ Bases:
object
Map from SmvGenericModule to resulting DataFrame
We need to simulate a dict from ds to df where the same object can be keyed by different datasets with the same fqn. For example, in the module
- class X(SmvModule):
- def requiresDS(self): return [Foo] def run(self, i): return i[Foo]
the i argument of the run method should map Foo to the correct DataFrame.
Parameters: (dict) – a map from fqn to DataFrame
-
instanceValHash
()[source]¶ Hash computed based on instance values of the dataset, such as the timestamp of an input file
Returns: (int)
-
isEphemeral
()[source]¶ Should this SmvGenericModule skip persisting its data?
Returns: True if this SmvGenericModule should not persist its data, false otherwise Return type: (bool)
-
metadata
(df)[source]¶ User-defined metadata
Override this method to define metadata that will be logged with your module’s results. Defaults to empty dictionary.
Parameters: df (DataFrame) – result of running the module, used to generate metadata Returns: dictionary of serializable metadata Return type: (dict)
-
metadataHistorySize
()[source]¶ Override to define the maximum size of the metadata history for this module
Returns: size Return type: (int)
-
needsToRun
()[source]¶ For non-ephemeral module, when persisted, no need to run for ephemeral module if all its requiresDS no need to run, also no need to run
-
requiresDS
()[source]¶ User-specified list of dependencies
Override this method to specify the SmvGenericModule needed as inputs.
Returns: a list of dependencies Return type: (list(SmvGenericModule))
-
validateMetadata
(current, history)[source]¶ User-defined metadata validation
Override this method to define validation rules for metadata given the current metadata and historical metadata.
Parameters: - current (dict) – current metadata kv
- history (list(dict)) – list of historical metadata kv’s
Returns: Validation failure message. Return None (or omit a return statement) if successful.
Return type: (str)
-
version
()[source]¶ Version number Deprecated!
Returns: version number of this SmvGenericModule Return type: (str)
-
versioned_fqn
¶
-
-
class
smv.smvgenericmodule.
SmvProcessModule
(smvApp)[source]¶ Bases:
smv.smvgenericmodule.SmvGenericModule
Base class for all intermediate data process modules
This is a sub-class of SmvGenericModule and as a sibling class of SmvIoModule.
- SmvProcessModule: multiple input, single output
- SmvInput: non-input, single output
- SmvOutput: single-input, non-output
User need to implement:
- requiresDS
- run
-
isEphemeral
()[source]¶ Default SmvProcessModule’s ephemeral flag to false so when mixin SmvOutput, will still be non-ephemeral
-
requiresConfig
()[source]¶ User-specified list of config keys this module depends on
The given keys and their values will influence the dataset hash
-
requiresLib
()[source]¶ User-specified list of ‘library’ dependencies. These are code, other than the DataSet’s run method that impact its output or behaviour.
Override this method to assist in re-running this module based on changes in other python objects (functions, classes, packages).
Limitations: For python modules and packages, the ‘requiresLib()’ method is limited to registering changes on the main file of the package (for module ‘foo’, that’s ‘foo.py’, for package ‘bar’, that’s ‘bar/__init__.py’). This means that if a module or package imports other modules, the imported module’s changes will not impact DataSet hashes.
Returns: a list of library dependencies Return type: (list(module))
-
run
(i)[source]¶ User-specified definition of the operations of this SmvModule
Override this method to define the output of this module, given a map ‘i’ from input SmvGenericModule to resulting DataFrame. ‘i’ will have a mapping for each SmvGenericModule listed in requiresDS. E.g.
- def requiresDS(self):
- return [MyDependency]
- def run(self, i):
- return i[MyDependency].select(“importantColumn”)
Parameters: (RunParams) – mapping from input SmvGenericModule to DataFrame Returns: output of this SmvModule Return type: (DataFrame)
smv.smvhdfs module¶
smv.smvinput module¶
SMV DataSet Framework’s SmvInputBase interface
This module defines the abstract classes which formed the SmvGenericModule’s input DS Framework for clients’ projects
-
class
smv.smvinput.
SmvMultiCsvFiles
(smvApp)[source]¶ Bases:
smv.iomod.inputs.SmvMultiCsvInputFiles
Raw input from multiple csv files sharing single schema
Instead of a single input file, specify a data dir with files which share the same schema.
-
dir
()[source]¶ Path to the directory containing the csv files and their schema
Returns: path Return type: (str)
-
dirName
()[source]¶ Path to the directory containing the csv files relative to the path defined in the connection
Returns: (str)
-
-
class
smv.smvinput.
SmvCsvFile
(smvApp)[source]¶ Bases:
smv.iomod.inputs.SmvCsvInputFile
Input from a file in CSV format Base class for CSV file input. User need to define path method.
Example: >>> class MyCsvFile(SmvCsvFile): >>> def path(self): >>> return “path/relative/to/smvInputDir/file.csv”
-
fileName
()[source]¶ User-specified file name relative to the path defined in the connection
Returns: (string)
-
-
class
smv.smvinput.
SmvSqlCsvFile
(smvApp)[source]¶ Bases:
smv.smvinput.SmvCsvFile
Input from a file in CSV format and using a SQL query to access it
-
query
()[source]¶ Query used to extract data from the table which reads the CSV file
Override this to specify your own query (optional). Default is equivalent to ‘select * from ‘ + tableName.
Returns: query Return type: (str)
-
tableName
= 'df'¶
-
-
class
smv.smvinput.
SmvHiveTable
(smvApp)[source]¶ Bases:
smv.iomod.inputs.SmvHiveInputTable
Input from a Hive table This is for backward compatability. Will be deprecated. Please use iomod.SmvHiveInputTable instead.
User need to implement:
- tableName
Custom query at reading is no more supported, please use downstream module to process data
smv.smviostrategy module¶
-
class
smv.smviostrategy.
SmvCsvOnHdfsIoStrategy
(smvApp, path, smvSchema, logger, write_mode='overwrite')[source]¶ Bases:
smv.smviostrategy.SmvIoStrategy
Simply read/write of csv, given schema. Not for persisting, which should be handled by SmvCsvPersistenceStrategy
-
class
smv.smviostrategy.
SmvCsvPersistenceStrategy
(smvApp, versioned_fqn, file_path=None)[source]¶ Bases:
smv.smviostrategy.SmvFileOnHdfsPersistenceStrategy
Persist strategy for using Smv CSV IO handler
Parameters: - smvApp (SmvApp) –
- versioned_fqn (str) – data/module’s FQN/Name with hash_of_hash
- file_path (str) – parameter “versioned_fqn” is used to create a data file path. However if “file_path” is provided, all the other 2 parameters are ignored
-
class
smv.smviostrategy.
SmvFileOnHdfsPersistenceStrategy
(smvApp, versioned_fqn=None, postfix=None, file_path=None)[source]¶ Bases:
smv.smviostrategy.SmvPersistenceStrategy
Abstract class for persisting data to Hdfs file system handling general tasks as file name creation, locking when write, etc.
Parameters: - smvApp (SmvApp) –
- versioned_fqn (str) – data/module’s FQN/Name with hash_of_hash
- postfix (str) – persisted file’s postfix
- file_path (str) – parameters “versioned_fqn” and “postfix” are used to create a data file path. However if “file_path” is provided, all the other 3 parameters are ignored
-
class
smv.smviostrategy.
SmvHiveIoStrategy
(smvApp, conn_info, table_name, write_mode='errorifexists')[source]¶ Bases:
smv.smviostrategy.SmvIoStrategy
Persist strategy for spark Hive IO
Parameters: - smvApp (SmvApp) –
- conn_info (SmvConnectionInfo) – Hive connection info
- table_name (str) – the table to read from/write to
- write_mode (str) – spark df writer’s SaveMode
-
class
smv.smviostrategy.
SmvIoStrategy
[source]¶ Bases:
abc.ABC
Base class for all module I/O, including read, write and persistence
-
class
smv.smviostrategy.
SmvJdbcIoStrategy
(smvApp, conn_info, table_name, write_mode='errorifexists')[source]¶ Bases:
smv.smviostrategy.SmvIoStrategy
Persist strategy for spark JDBC IO
Parameters: - smvApp (SmvApp) –
- conn_info (SmvConnectionInfo) – Jdbc connection info
- table_name (str) – the table to read from/write to
- write_mode (str) – spark df writer’s SaveMode
-
class
smv.smviostrategy.
SmvNonOpPersistenceStrategy
[source]¶ Bases:
smv.smviostrategy.SmvPersistenceStrategy
Never persist, isPersisted always returns false
-
class
smv.smviostrategy.
SmvParquetPersistenceStrategy
(smvApp, versioned_fqn, file_path=None)[source]¶ Bases:
smv.smviostrategy.SmvFileOnHdfsPersistenceStrategy
Persist strategy for using Spark native parquet
Parameters: - smvApp (SmvApp) –
- versioned_fqn (str) – data/module’s FQN/Name with hash_of_hash
- file_path (str) – parameter “versioned_fqn” is used to create a data file path. However if “file_path” is provided, all the other 2 parameters are ignored
-
class
smv.smviostrategy.
SmvPersistenceStrategy
[source]¶ Bases:
smv.smviostrategy.SmvIoStrategy
Base class for IO strategy which used for persisting data
-
class
smv.smviostrategy.
SmvPicklablePersistenceStrategy
(smvApp, versioned_fqn, file_path=None)[source]¶
-
class
smv.smviostrategy.
SmvSchemaOnHdfsIoStrategy
(smvApp, path, write_mode='overwrite')[source]¶ Bases:
smv.smviostrategy.SmvIoStrategy
Read/write of an SmvSchema file on Hdfs
-
class
smv.smviostrategy.
SmvTextOnHdfsIoStrategy
(smvApp, path)[source]¶ Bases:
smv.smviostrategy.SmvIoStrategy
Simple read/write a small text file on Hdfs
smv.smvlock module¶
smv.smvmodule module¶
SMV DataSet Framework interface
This module defines the abstract classes which formed the SmvModule Framework for clients’ projects
-
class
smv.smvmodule.
SmvOutput
[source]¶ Bases:
object
Mixin which marks an SmvModule as one of the output of its stage SmvOutputs are distinct from other SmvModule in that
- The -s and –run-app options of smv-run only run SmvOutputs and their dependencies.
Deprecated. Will be replaced by sub-classed of smv.io.SmvOutput.
-
IsSmvOutput
= True¶
-
class
smv.smvmodule.
SmvModule
(smvApp)[source]¶ Bases:
smv.smvmodule.SmvSparkDfModule
SmvModule is the same as SmvSparkDfModule. Since it was used in all the SMV projects, we will not change the its name for backward compatibility. May deprecate when the full SmvGenericModule framework get adopted
-
class
smv.smvmodule.
SmvSqlModule
(smvApp)[source]¶ Bases:
smv.smvmodule.SmvModule
An SMV module which executes a SQL query in place of a run method
-
query
()[source]¶ User-specified SQL query defining the behavior of this module
Before the query is executed, all dependencies will be registered as tables with the names specified in the tables method.
-
requiresDS
()[source]¶ User-specified list of dependencies
Override this method to specify the SmvGenericModule needed as inputs.
Returns: a list of dependencies Return type: (list(SmvGenericModule))
-
run
(i)[source]¶ User-specified definition of the operations of this SmvModule
Override this method to define the output of this module, given a map ‘i’ from input SmvGenericModule to resulting DataFrame. ‘i’ will have a mapping for each SmvGenericModule listed in requiresDS. E.g.
- def requiresDS(self):
- return [MyDependency]
- def run(self, i):
- return i[MyDependency].select(“importantColumn”)
Parameters: (RunParams) – mapping from input SmvGenericModule to DataFrame Returns: output of this SmvModule Return type: (DataFrame)
-
-
class
smv.smvmodule.
SmvModel
(smvApp)[source]¶ Bases:
smv.smvgenericmodule.SmvProcessModule
SmvModule whose result is a data model
The result must be picklable - see https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled.
-
class
smv.smvmodule.
SmvModelExec
(smvApp)[source]¶ Bases:
smv.smvmodule.SmvModule
SmvModule that runs a model produced by an SmvModel
-
requiresModel
()[source]¶ User-specified SmvModel module
Returns: the SmvModel this module depends on Return type: (SmvModel)
-
run
(i, model)[source]¶ User-specified definition of the operations of this SmvModule
Override this method to define the output of this module, given a map ‘i’ from input SmvGenericModule to resulting DataFrame. ‘i’ will have a mapping for each SmvGenericModule listed in requiresDS. E.g.
- def requiresDS(self):
- return [MyDependency]
- def run(self, i):
- return i[MyDependency].select(“importantColumn”)
Parameters: Returns: picklable output of this SmvModule
Return type: (object)
-
smv.smvmodulerunner module¶
smv.smvschema module¶
-
class
smv.smvschema.
SmvSchema
(j_smv_schema)[source]¶ Bases:
object
The Python representation of SmvSchema scala class.
Most of the work is still being done on the scala side and this is just a pass through.
smv.smvshell module¶
Helper functions available in SMV’s Python shell
-
smv.smvshell.
quickRun
(name)[source]¶ Run module and return result. No persist, but use existing persisted if possible. No DQM
-
smv.smvshell.
df
(name, forceRun=False, quickRun=True)[source]¶ The DataFrame result of running the named module
Parameters: - name (str) – The unique name of a module. Does not have to be the FQN.
- forceRun (bool) – True if the module should be forced to run even if it has persisted output. False otherwise.
- quickRun (bool) – skip computing dqm+metadata and persisting csv
Returns: The result of running the named module.
Return type: (DataFrame)
-
smv.smvshell.
dshash
(name)[source]¶ The current hashOfHash for the named module as a hex string
Parameters: - name (str) – The uniquen name of a module. Does not have to be the FQN.
- runConfig (dict) – runConfig to apply when collecting info. If module was run with a config, the same config needs to be specified here to retrieve the correct hash.
Returns: The hashOfHash of the named module
Return type: (int)
-
smv.smvshell.
getModel
(name, forceRun=False)[source]¶ Get the result of running the named SmvModel module
Parameters: - name (str) – The name of a module. Does not have to be the FQN.
- forceRun (bool) – True if the module should be forced to run even if it has persisted output. False otherwise.
- version (str) – The name of the published version to load from
Returns: The result of running the named module
Return type: (object)
-
smv.smvshell.
openHive
(tableName)[source]¶ Read in a Hive table as a DataFrame
Parameters: tableName (str) – The name of the Hive table Returns: The resulting DataFrame Return type: (DataFrame)
-
smv.smvshell.
openCsv
(path, validate=False)[source]¶ Read in a CSV file as a DataFrame
Parameters: - path (str) – The path of the CSV file
- validate (bool) – If true, validate the CSV before return DataFrame (raise error if malformatted)
Returns: The resulting DataFrame
Return type: (DataFrame)
-
smv.smvshell.
ls
(stageName=None)[source]¶ List all datasets in a stage
Parameters: stageName (str) – The name of the stage. Defaults to None, in which ase all datasets in all stages will be listed.
-
smv.smvshell.
lsDead
(stageName=None)[source]¶ List dead datasets in a stage
Parameters: stageName (str) – The name of the stage. Defaults to None, in which ase all datasets in all stages will be listed.
-
smv.smvshell.
props
()[source]¶ The current app propertied used by SMV after the app, user, command-line and dynamic props are merged.
Returns: The ‘mergedProps’ or final props used by SMV Return type: (dict)
-
smv.smvshell.
exportToHive
(dsname)[source]¶ Export dataset’s running result to a Hive table
Parameters: dsname (str) – The name of an SmvModule
-
smv.smvshell.
ancestors
(dsname)[source]¶ List all ancestors of a dataset
Ancestors of a dataset are the dataset it depends on, directly or in-directly, including datasets from other stages.
Parameters: dsname (str) – The name of an SmvGenericModule
-
smv.smvshell.
descendants
(dsname)[source]¶ List all descendants of a dataset
Descendants of a dataset are the datasets which depend on it directly or in-directly, including datasets from other stages
Parameters: dsname (str) – The name of an SmvGenericModule
-
smv.smvshell.
smvDiscoverSchemaToFile
(path, n=100000, ca=None)[source]¶ Try best to discover Schema from raw Csv file
Will save a schema file with postfix “.toBeReviewed” in local directory.
Parameters: - path (str) – Path to the CSV file
- n (int) – Number of records to check for schema discovery, default 100k
- ca (CsvAttributes) – Defaults to CsvWithHeader
-
smv.smvshell.
run_test
(test_name)[source]¶ Run a test with the given name without creating new Spark context
First reloads SMV and the test from source, then runs the test.
Parameters: test_name (str) – Name of the test to run
-
smv.smvshell.
show_run_info
(collector)[source]¶ Inspects the SmvRunInfoCollector object returned by smvApp.runModule
-
smv.smvshell.
get_run_info
(name, runConfig=None)[source]¶ Get the SmvRunInfoCollector with full information about a module and its dependencies
Parameters: - name (str) – name of the module whose information to collect
- runConfig (dict) – runConfig to apply when collecting info. If module was run with a config, the same config needs to be specified here to retrieve the info.
smv.utils module¶
-
smv.utils.
infer_full_name_from_part
(full_names, part_name)[source]¶ For a given partial name (postfix), infer full name from a list