Source code for smv.helpers

# This file is licensed under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SMV DataFrame Helpers and Column Helpers

    This module provides the helper functions on DataFrame objects and Column objects
"""
import sys
import inspect

import decorator
from pyspark import SparkContext
from pyspark.sql import DataFrame
from pyspark.sql.column import Column
import pyspark.sql.functions as F
from pyspark.sql.types import DataType

from smv.utils import smv_copy_array
from smv.error import SmvRuntimeError
from smv.utils import is_string
from smv.schema_meta_ops import SchemaMetaOps


# common converters to pass to _to_seq and _to_list
def _jcol(c): return c._jc
def _jdf(df): return df._jdf

# Modified from Spark column.py
def _to_seq(cols, converter=None):
    """
    Convert a list of Column (or names) into a JVM Seq of Column.

    An optional `converter` could be used to convert items in `cols`
    into JVM Column objects.
    """
    if converter:
        cols = [converter(c) for c in cols]
    return _sparkContext()._jvm.PythonUtils.toSeq(cols)

# Modified from Spark column.py
def _to_list(cols, converter=None):
    """
    Convert a list of Column (or names) into a JVM (Scala) List of Column.

    An optional `converter` could be used to convert items in `cols`
    into JVM Column objects.
    """
    if converter:
        cols = [converter(c) for c in cols]
    return _sparkContext()._jvm.PythonUtils.toList(cols)

def _sparkContext():
    return SparkContext._active_spark_context

[docs]class SmvGroupedData(object): """The result of running `smvGroupBy` on a DataFrame. Implements special SMV aggregations. """ def __init__(self, df, keys, sgd): self.df = df self.keys = keys self.sgd = sgd
[docs] def smvTopNRecs(self, maxElems, *cols): """For each group, return the top N records according to a given ordering Example: >>> df.smvGroupBy("id").smvTopNRecs(3, col("amt").desc()) This will keep the 3 largest amt records for each id Args: maxElems (int): maximum number of records per group cols (\*str): columns defining the ordering Returns: (DataFrame): result of taking top records from groups """ return DataFrame(self.sgd.smvTopNRecs(maxElems, smv_copy_array(self.df._sc, *cols)), self.df.sql_ctx)
[docs] def smvPivot(self, pivotCols, valueCols, baseOutput): """smvPivot adds the pivoted columns without additional aggregation. In other words N records in, N records out SmvPivot for Pivot Operations: Pivot Operation on DataFrame that transforms multiple rows per key into a columns so they can be aggregated to a single row for a given key while preserving all the data variance. Example: For input data below: +-----+-------+---------+-------+ | id | month | product | count | +=====+=======+=========+=======+ | 1 | 5/14 | A | 100 | +-----+-------+---------+-------+ | 1 | 6/14 | B | 200 | +-----+-------+---------+-------+ | 1 | 5/14 | B | 300 | +-----+-------+---------+-------+ We would like to generate a data set to be ready for aggregations. The desired output is: +-----+--------------+--------------+--------------+--------------+ | id | count_5_14_A | count_5_14_B | count_6_14_A | count_6_14_B | +=====+==============+==============+==============+==============+ | 1 | 100 | NULL | NULL | NULL | +-----+--------------+--------------+--------------+--------------+ | 1 | NULL | NULL | NULL | 200 | +-----+--------------+--------------+--------------+--------------+ | 1 | NULL | 300 | NULL | NULL | +-----+--------------+--------------+--------------+--------------+ The raw input is divided into three parts. 1. key column: part of the primary key that is preserved in the output. That would be the `id` column in the above example. 2. pivot columns: the columns whose row values will become the new column names. The cross product of all unique values for *each* column is used to generate the output column names. 3. value column: the value that will be copied/aggregated to corresponding output column. `count` in our example. Example code: >>> df.smvGroupBy("id").smvPivot([["month", "product"]], ["count"], ["5_14_A", "5_14_B", "6_14_A", "6_14_B"]) Args: pivotCols (list(list(str))): specify the pivot columns, on above example, it is [['month, 'product]]. If [['month'], ['month', 'product']] is used, the output columns will have "count_5_14" and "count_6_14" as addition to the example. valueCols (list(string)): names of value columns which will be prepared for aggregation baseOutput (list(str)): expected names of the pivoted column In above example, it is ["5_14_A", "5_14_B", "6_14_A", "6_14_B"]. The user is required to supply the list of expected pivot column output names to avoid and extra action on the input DataFrame just to extract the possible pivot columns. If an empty sequence is provided, the base output columns will be extracted from values in the pivot columns (will cause an action on the entire DataFrame!) Returns: (DataFrame): result of pivot operation """ return DataFrame(self.sgd.smvPivot(smv_copy_array(self.df._sc, *pivotCols), smv_copy_array(self.df._sc, *valueCols), smv_copy_array(self.df._sc, *baseOutput)), self.df.sql_ctx)
[docs] def smvPivotSum(self, pivotCols, valueCols, baseOutput): """Perform SmvPivot, then sum the results. Please refer smvPivot's document for context and details of the SmvPivot operation. Args: pivotCols (list(list(str))): list of lists of column names to pivot valueCols (list(string)): names of value columns to sum baseOutput (list(str)): expected names pivoted column Examples: For example, given a DataFrame df that represents the table +-----+-------+---------+-------+ | id | month | product | count | +=====+=======+=========+=======+ | 1 | 5/14 | A | 100 | +-----+-------+---------+-------+ | 1 | 6/14 | B | 200 | +-----+-------+---------+-------+ | 1 | 5/14 | B | 300 | +-----+-------+---------+-------+ we can use >>> df.smvGroupBy("id").smvPivotSum([["month", "product"]], ["count"], ["5_14_A", "5_14_B", "6_14_A", "6_14_B"]) to produce the following output +-----+--------------+--------------+--------------+--------------+ | id | count_5_14_A | count_5_14_B | count_6_14_A | count_6_14_B | +=====+==============+==============+==============+==============+ | 1 | 100 | 300 | NULL | 200 | +-----+--------------+--------------+--------------+--------------+ Returns: (DataFrame): result of pivot sum """ return DataFrame(self.sgd.smvPivotSum(smv_copy_array(self.df._sc, *pivotCols), smv_copy_array(self.df._sc, *valueCols), smv_copy_array(self.df._sc, *baseOutput)), self.df.sql_ctx)
[docs] def smvPivotCoalesce(self, pivotCols, valueCols, baseOutput): """Perform SmvPivot, then coalesce the output Please refer smvPivot's document for context and details of the SmvPivot operation. Args: pivotCols (list(list(str))): list of lists of column names to pivot valueCols (list(string)): names of value columns to coalesce baseOutput (list(str)): expected names pivoted column Examples: For example, given a DataFrame df that represents the table +---+---+----+ | k| p| v| +===+===+====+ | a| c| 1| +---+---+----+ | a| d| 2| +---+---+----+ | a| e|null| +---+---+----+ | a| f| 5| +---+---+----+ we can use >>> df.smvGroupBy("k").smvPivotCoalesce([['p']], ['v'], ['c', 'd', 'e', 'f']) to produce the following output +---+---+---+----+---+ | k|v_c|v_d| v_e|v_f| +===+===+===+====+===+ | a| 1| 2|null| 5| +---+---+---+----+---+ Returns: (Dataframe): result of pivot coalesce """ return DataFrame(self.sgd.smvPivotCoalesce(smv_copy_array(self.df._sc, *pivotCols), smv_copy_array(self.df._sc, *valueCols), smv_copy_array(self.df._sc, *baseOutput)), self.df.sql_ctx)
[docs] def smvRePartition(self, numParts): """Repartition SmvGroupedData using specified partitioner on the keys. A HashPartitioner with the specified number of partitions will be used. This method is used in the cases that the key-space is very large. In the current Spark DF's groupBy method, the entire key-space is actually loaded into executor's memory, which is very dangerous when the key space is big. The regular DF's repartition function doesn't solve this issue since a random repartition will not guaranteed to reduce the key-space on each executor. In that case we need to use this function to linearly reduce the key-space. Example: >>> df.smvGroupBy("k1", "k2").smvRePartition(32).agg(sum("v") as "v") """ jgdadp = self.sgd.smvRePartition(numParts) df = DataFrame(jgdadp.toDF(), self.df.sql_ctx) return SmvGroupedData(df, self.keys, jgdadp)
[docs] def smvFillNullWithPrevValue(self, *orderCols): """Fill in Null values with "previous" value according to an ordering Examples: Given DataFrame df representing the table +---+---+------+ | K | T | V | +===+===+======+ | a | 1 | null | +---+---+------+ | a | 2 | a | +---+---+------+ | a | 3 | b | +---+---+------+ | a | 4 | null | +---+---+------+ we can use >>> df.smvGroupBy("K").smvFillNullWithPrevValue($"T".asc)("V") to produce the result +---+---+------+ | K | T | V | +===+===+======+ | a | 1 | null | +---+---+------+ | a | 2 | a | +---+---+------+ | a | 3 | b | +---+---+------+ | a | 4 | b | +---+---+------+ Returns: (Dataframe): result of fill nulls with previous value """ def __doFill(*valueCols): return DataFrame(self.sgd.smvFillNullWithPrevValue(smv_copy_array(self.df._sc, *orderCols), smv_copy_array(self.df._sc, *valueCols)), self.df.sql_ctx) return __doFill
[docs] def smvWithTimePanel(self, time_col, start, end): """Adding a specified time panel period to the DF Args: time_col (str): the column name in the data as the event timestamp start (panel.PartialTime): could be Day, Month, Week, Quarter, refer the panel package for details end (panel.PartialTime): should be the same time type as the "start" addMissingTimeWithNull (boolean): Default True. when some PartialTime is missing whether to fill null records Returns: (DataFrame): a result data frame with keys, and a column with name `smvTime`, and other input columns. Refer the panel package for the potential forms of different PartialTimes. Note: Since `TimePanel` defines a period of time, if for some group in the data there are missing Months (or Quarters), when addMissingTimeWithNull is true, this function will add records with non-null keys and all possible `smvTime` columns with all other columns null-valued. Example: Given DataFrame df as +---+------------+------+ | K | TS | V | +===+============+======+ | 1 | 2014-01-01 | 1.2 | +---+------------+------+ | 1 | 2014-03-01 | 4.5 | +---+------------+------+ | 1 | 2014-03-25 | 10.3 | +---+------------+------+ after applying >>> import smv.panel as P >>> df.smvGroupBy("k").smvWithTimePanel("TS", P.Month(2014,1), P.Month(2014, 3)) the result is +---+------------+------+---------+ | K | TS | V | smvTime | +===+============+======+=========+ | 1 | 2014-01-01 | 1.2 | M201401 | +---+------------+------+---------+ | 1 | 2014-03-01 | 4.5 | M201403 | +---+------------+------+---------+ | 1 | 2014-03-25 | 10.3 | M201403 | +---+------------+------+---------+ | 1 | None | None | M201401 | +---+------------+------+---------+ | 1 | None | None | M201402 | +---+------------+------+---------+ | 1 | None | None | M201403 | +---+------------+------+---------+ """ return DataFrame(self.sgd.smvWithTimePanel(time_col, start, end), self.df.sql_ctx)
[docs] def smvTimePanelAgg(self, time_col, start, end): """Apply aggregation on given keys and specified time panel period Args: time_col (str): the column name in the data as the event timestamp start (panel.PartialTime): could be Day, Month, Week, Quarter, refer the panel package for details end (panel.PartialTime): should be the same time type as the "start" addMissingTimeWithNull (default true) when some PartialTime is missing whether to fill null records Both `start` and `end` PartialTime are inclusive. Example: >>> df.smvGroupBy("K").smvTimePanelAgg("TS", Week(2012, 1, 1), Week(2012, 12, 31))( sum("V").alias("V") ) Returns: (DataFrame): a result data frame with keys, and a column with name `smvTime`, and aggregated values. Refer the panel package for the potential forms of different PartialTimes When addMissingTimeWithNull is true, the aggregation should be always on the variables instead of on literals (should NOT be count(lit(1))). Example with on data: Given DataFrame df as +---+------------+------+ | K | TS | V | +===+============+======+ | 1 | 2012-01-01 | 1.5 | +---+------------+------+ | 1 | 2012-03-01 | 4.5 | +---+------------+------+ | 1 | 2012-07-01 | 7.5 | +---+------------+------+ | 1 | 2012-05-01 | 2.45 | +---+------------+------+ after applying >>> import smv.panel as P >>> df.smvGroupBy("K").smvTimePanelAgg("TS", P.Quarter(2012, 1), P.Quarter(2012, 2))( sum("V").alias("V") ) the result is +---+---------+------+ | K | smvTime | V | +===+=========+======+ | 1 | Q201201 | 6.0 | +---+---------+------+ | 1 | Q201202 | 2.45 | +---+---------+------+ """ def __doAgg(*aggs): return DataFrame(self.sgd.smvTimePanelAgg(time_col, start, end, smv_copy_array(self.df._sc, *aggs)), self.df.sql_ctx) return __doAgg
[docs] def smvPercentRank(self, value_cols, ignoreNull=True): """Compute the percent rank of a sequence of columns within a group in a given DataFrame. Used Spark's `percentRank` window function. The precent rank is defined as `R/(N-1)`, where `R` is the base 0 rank, and `N` is the population size. Under this definition, min value (R=0) has percent rank `0.0`, and max value has percent rank `1.0`. For each column for which the percent rank is computed (e.g. "v"), an additional column is added to the output, `v_pctrnk` All other columns in the input are untouched and propagated to the output. Args: value_cols (list(str)): columns to calculate percentRank on ignoreNull (boolean): if true, null values's percent ranks will be nulls, otherwise, as Spark sort considers null smaller than any value, nulls percent ranks will be zeros. Default true. Example: >>> df.smvGroupBy('g, 'g2).smvPercentRank(["v1", "v2", "v3"]) """ return DataFrame(self.sgd.smvPercentRank(smv_copy_array(self.df._sc, *value_cols), ignoreNull), self.df.sql_ctx)
[docs] def smvQuantile(self, value_cols, bin_num, ignoreNull=True): """Compute the quantile bin numbers within a group in a given DataFrame. Estimate quantiles and quantile groups given a data with unknown distribution is quite arbitrary. There are multiple 'definitions' used in different softwares. Please refer https://en.wikipedia.org/wiki/Quantile#Estimating_quantiles_from_a_sample for details. `smvQuantile` calculated from Spark's `percentRank`. The algorithm is equavalent to the one labled as `R-7, Excel, SciPy-(1,1), Maple-6` in above wikipedia page. Please note it is slight different from SAS's default algorithm (labled as SAS-5). Returned quantile bin numbers are 1 based. For example when `bin_num=10`, returned values are integers from 1 to 10, inclusively. For each column for which the quantile is computed (e.g. "v"), an additional column is added to the output, "v_quantile". All other columns in the input are untouched and propagated to the output. Args: value_cols (list(str)): columns to calculate quantiles on bin_num (int): number of quantiles, e.g. 4 for quartile, 10 for decile ignoreNull (boolean): if true, null values's percent ranks will be nulls, otherwise, as Spark sort considers null smaller than any value, nulls percent ranks will be zeros. Default true. Example: >>> df.smvGroupBy('g, 'g2).smvQuantile(["v"], 100) """ return DataFrame(self.sgd.smvQuantile(smv_copy_array(self.df._sc, *value_cols), bin_num, ignoreNull), self.df.sql_ctx)
[docs] def smvDecile(self, value_cols, ignoreNull=True): """Compute deciles of some columns on a grouped data Simply an alias to `smvQuantile(value_cols, 10, ignoreNull)` Args: value_cols (list(str)): columns to calculate deciles on ignoreNull (boolean): if true, null values's percent ranks will be nulls, otherwise, as Spark sort considers null smaller than any value, nulls percent ranks will be zeros. Default true. """ return self.smvQuantile(value_cols, 10, ignoreNull)
[docs]class SmvMultiJoin(object): """Wrapper around Scala's SmvMultiJoin""" def __init__(self, sqlContext, mj): self.sqlContext = sqlContext self.mj = mj
[docs] def joinWith(self, df, postfix, jointype = None): """Append SmvMultiJoin Chain Args: df (DataFrame): the DataFrame to join with postfix (string): postfix to use when renaming join columns jointype (string): optional jointype. if not specified, `conf.defaultJoinType` is used. choose one of ['inner', 'outer', 'leftouter', 'rightouter', 'leftsemi'] Example: >>> joindf = df1.smvJoinMultipleByKey(['a'], 'inner').joinWith(df2, '_df2').joinWith(df3, '_df3', 'outer') Returns: (SmvMultiJoin): formula of the join. need to call `doJoin()` on it to execute """ return SmvMultiJoin(self.sqlContext, self.mj.joinWith(df._jdf, postfix, jointype))
[docs] def doJoin(self, dropextra = False): """Trigger the join operation Args: dropExtra (boolean): default false, which will keep all duplicated name columns with the postfix. when true, the duplicated columns will be dropped Example: >>> joindf.doJoin() Returns: (DataFrame): result of executing the join operation """ return DataFrame(self.mj.doJoin(dropextra), self.sqlContext)
def _getUnboundMethod(helperCls, oldMethod): def newMethod(_oldMethod, self, *args, **kwargs): return _oldMethod(helperCls(self), *args, **kwargs) return decorator.decorate(oldMethod, newMethod) def _helpCls(receiverCls, helperCls): iscallable = lambda f: hasattr(f, "__call__") for name, oldMethod in inspect.getmembers(helperCls, predicate=iscallable): # We will use decorator.decorate to ensure that attributes of oldMethod, like # docstring and signature, are inherited by newMethod. decorator.decorate # won't accept an unbound method, so for Python 2 we extract oldMethod's # implementing function __func__. In Python 3, inspect.getmembers will return # the implementing functions insead of unbound method - this is due to # Python 3's data model. try: impl = oldMethod.__func__ except: impl = oldMethod if not name.startswith("_"): newMethod = _getUnboundMethod(helperCls, impl) setattr(receiverCls, name, newMethod)
[docs]class DataFrameHelper(object): def __init__(self, df): self.df = df self._sc = df._sc self._sql_ctx = df.sql_ctx self._jdf = df._jdf self._jPythonHelper = df._sc._jvm.SmvPythonHelper self._jDfHelper = df._sc._jvm.SmvDFHelper(df._jdf) self._SchemaMetaOps = SchemaMetaOps(df)
[docs] def smvExpandStruct(self, *cols): """Expand structure type column to a group of columns Args: cols (\*string): column names to expand Example: input DF: [id: string, address: struct<state:string, zip:string, street:string>] >>> df.smvExpandStruct("address") output DF: [id: string, state: string, zip: string, street: string] Returns: (DataFrame): DF with expanded columns """ jdf = self._jPythonHelper.smvExpandStruct(self._jdf, smv_copy_array(self._sc, *cols)) return DataFrame(jdf, self._sql_ctx)
[docs] def smvGroupBy(self, *cols): """Similar to groupBy, instead of creating GroupedData, create an `SmvGroupedData` object. See [[org.tresamigos.smv.SmvGroupedDataFunc]] for list of functions that can be applied to the grouped data. Args: cols (\*string or \*Column): column names or Column objects to group on Note: This is going away shortly and user will be able to use standard Spark `groupBy` method directly. Example: >>> df.smvGroupBy(col("k")) >>> df.smvGroupBy("k") Returns: (SmvGroupedData): grouped data object """ if isinstance(cols[0], Column): keys = [ColumnHelper(c).smvGetColName() for c in cols] elif is_string(cols[0]): keys = list(cols) else: raise SmvRuntimeError("smvGroupBy does not support type: " + type(cols[0])) jSgd = self._jPythonHelper.smvGroupBy(self._jdf, smv_copy_array(self._sc, *cols)) return SmvGroupedData(self.df, keys, jSgd)
[docs] def smvHashSample(self, key, rate=0.01, seed=23): """Sample the df according to the hash of a column MurmurHash3 algorithm is used for generating the hash Args: key (string or Column): column name or Column to sample on rate (double): sample rate in range (0, 1] with a default of 0.01 (1%) seed (int): random generator integer seed with a default of 23 Example: >>> df.smvHashSample(col("key"), rate=0.1, seed=123) Returns: (DataFrame): sampled DF """ if is_string(key): jkey = F.col(key)._jc elif isinstance(key, Column): jkey = key._jc else: raise RuntimeError("key parameter must be either a String or a Column") jdf = self._jDfHelper.smvHashSample(jkey, rate, seed) return DataFrame(jdf, self._sql_ctx)
[docs] def smvJoinByKey(self, other, keys, joinType, isNullSafe=False): """joins two DataFrames on a key The Spark `DataFrame` join operation does not handle duplicate key names. If both left and right side of the join operation contain the same key, the result `DataFrame` is unusable. The `smvJoinByKey` method will allow the user to join two `DataFrames` using the same join key. Post join, only the left side keys will remain. In case of outer-join, the `coalesce(leftkey, rightkey)` will replace the left key to be kept. Args: other (DataFrame): the DataFrame to join with keys (list(string)): a list of column names on which to apply the join joinType (string): choose one of ['inner', 'outer', 'leftouter', 'rightouter', 'leftsemi'] isNullSafe (boolean): if true matches null keys between left and right tables and keep in output. Default False. Example: >>> df1.smvJoinByKey(df2, ["k"], "inner") >>> df_with_null_key.smvJoinByKey(df2, ["k"], "inner", True) Returns: (DataFrame): result of the join operation """ jdf = self._jPythonHelper.smvJoinByKey(self._jdf, other._jdf, _to_seq(keys), joinType, isNullSafe) return DataFrame(jdf, self._sql_ctx)
[docs] def smvJoinMultipleByKey(self, keys, joinType = 'inner'): """Create multiple DF join builder It is used in conjunction with `joinWith` and `doJoin` Args: keys (list(string)): a list of column names on which to apply the join joinType (string): choose one of ['inner', 'outer', 'leftouter', 'rightouter', 'leftsemi'] Example: >>> df.joinMultipleByKey(["k1", "k2"], "inner").joinWith(df2, "_df2").joinWith(df3, "_df3", "leftouter").doJoin() Returns: (SmvMultiJoin): the builder object for the multi join operation """ jdf = self._jPythonHelper.smvJoinMultipleByKey(self._jdf, smv_copy_array(self._sc, *keys), joinType) return SmvMultiJoin(self._sql_ctx, jdf)
[docs] def topNValsByFreq(self, n, col): """Get top N most frequent values in Column col Args: n (int): maximum number of values col (Column): which column to get values from Example: >>> df.topNValsByFreq(1, col("cid")) will return the single most frequent value in the cid column Returns: (list(object)): most frequent values (type depends on schema) """ topNdf = DataFrame(self._jDfHelper._topNValsByFreq(n, col._jc), self._sql_ctx) return [list(r)[0] for r in topNdf.collect()]
[docs] def smvSkewJoinByKey(self, other, joinType, skewVals, key): """Join that leverages broadcast (map-side) join of rows with skewed (high-frequency) values Rows keyed by skewed values are joined via broadcast join while remaining rows are joined without broadcast join. Occurrences of skewVals in df2 should be infrequent enough that the filtered table is small enough for a broadcast join. result is the union of the join results. Args: other (DataFrame): DataFrame to join with joinType (str): name of type of join (e.g. "inner") skewVals (list(object)): list of skewed values key (str): key on which to join (also the Column with the skewed values) Example: >>> df.smvSkewJoinByKey(df2, "inner", [4], "cid") will broadcast join the rows of df1 and df2 where col("cid") == "4" and join the remaining rows of df1 and df2 without broadcast join. Returns: (DataFrame): the result of the join operation """ jdf = self._jDfHelper.smvSkewJoinByKey(other._jdf, joinType, _to_seq(skewVals), key) return DataFrame(jdf, self._sql_ctx)
[docs] def smvSelectMinus(self, *cols): """Remove one or more columns from current DataFrame Args: cols (\*string or \*Column): column names or Columns to remove from the DataFrame Example: >>> df.smvSelectMinus("col1", "col2") >>> df.smvSelectMinus(col("col1"), col("col2")) Returns: (DataFrame): the resulting DataFrame after removal of columns """ jdf = self._jPythonHelper.smvSelectMinus(self._jdf, smv_copy_array(self._sc, *cols)) return DataFrame(jdf, self._sql_ctx)
[docs] def smvSelectPlus(self, *cols): """Selects all the current columns in current DataFrame plus the supplied expressions The new columns are added to the end of the current column list. Args: cols (\*Column): expressions to add to the DataFrame Example: >>> df.smvSelectPlus((col("price") * col("count")).alias("amt")) Returns: (DataFrame): the resulting DataFrame after removal of columns """ jdf = self._jDfHelper.smvSelectPlus(_to_seq(cols, _jcol)) return DataFrame(jdf, self._sql_ctx)
[docs] def smvPrefixFieldNames(self, prefix): """Apply a prefix to all column names in the given `DataFrame`. Args: prefix (string): prefix string to be added to all the column names Example: >>> df.smvPrefixFieldNames("x_") Above will add `x_` to the beginning of every column name in the `DataFrame`. Please note that if the renamed column names over lap with existing columns, the method will error out. Returns: (DataFrame) """ jdf = self._jDfHelper.smvPrefixFieldNames(prefix) return DataFrame(jdf, self._sql_ctx)
[docs] def smvDedupByKey(self, *keys): """Remove duplicate records from the DataFrame by arbitrarily selecting the first record from a set of records with same primary key or key combo. Args: keys (\*string or \*Column): the column names or Columns on which to apply dedup Example: input DataFrame: +-----+---------+---------+ | id | product | Company | +=====+=========+=========+ | 1 | A | C1 | +-----+---------+---------+ | 1 | C | C2 | +-----+---------+---------+ | 2 | B | C3 | +-----+---------+---------+ | 2 | B | C4 | +-----+---------+---------+ >>> df.smvDedupByKey("id") output DataFrame: +-----+---------+---------+ | id | product | Company | +=====+=========+=========+ | 1 | A | C1 | +-----+---------+---------+ | 2 | B | C3 | +-----+---------+---------+ >>> df.smvDedupByKey("id", "product") output DataFrame: +-----+---------+---------+ | id | product | Company | +=====+=========+=========+ | 1 | A | C1 | +-----+---------+---------+ | 1 | C | C2 | +-----+---------+---------+ | 2 | B | C3 | +-----+---------+---------+ Returns: (DataFrame): a DataFrame without duplicates for the specified keys """ jdf = self._jPythonHelper.smvDedupByKey(self._jdf, smv_copy_array(self._sc, *keys)) return DataFrame(jdf, self._sql_ctx)
[docs] def smvDedupByKeyWithOrder(self, *keys): """Remove duplicated records by selecting the first record relative to a given ordering The order is specified in another set of parentheses, as follows: >>> def smvDedupByKeyWithOrder(self, *keys)(*orderCols) Note: Same as the `smvDedupByKey` method, we use RDD groupBy in the implementation of this method to make sure we can handle large key space. Args: keys (\*string or \*Column): the column names or Columns on which to apply dedup Example: input DataFrame: +-----+---------+---------+ | id | product | Company | +=====+=========+=========+ | 1 | A | C1 | +-----+---------+---------+ | 1 | C | C2 | +-----+---------+---------+ | 2 | B | C3 | +-----+---------+---------+ | 2 | B | C4 | +-----+---------+---------+ >>> df.smvDedupByKeyWithOrder(col("id"))(col("product").desc()) output DataFrame: +-----+---------+---------+ | id | product | Company | +=====+=========+=========+ | 1 | C | C2 | +-----+---------+---------+ | 2 | B | C3 | +-----+---------+---------+ Returns: (DataFrame): a DataFrame without duplicates for the specified keys / order """ def _withOrder(*orderCols): jdf = self._jPythonHelper.smvDedupByKeyWithOrder(self._jdf, smv_copy_array(self._sc, *keys), smv_copy_array(self._sc, *orderCols)) return DataFrame(jdf, self._sql_ctx) return _withOrder
[docs] def smvUnion(self, *dfothers): """Unions DataFrames with different number of columns by column name and schema Spark unionAll ignores column names & schema, and can only be performed on tables with the same number of columns. Args: dfOthers (\*DataFrame): the dataframes to union with Example: >>> df.smvUnion(df2, df3) Returns: (DataFrame): the union of all specified DataFrames """ jdf = self._jDfHelper.smvUnion(_to_seq(dfothers, _jdf)) return DataFrame(jdf, self._sql_ctx)
[docs] def smvRenameField(self, *namePairs): """Rename one or more fields of a `DataFrame` Args: namePairs (\*tuple): tuples of strings where the first is the source column name, and the second is the target column name Example: >>> df.smvRenameField(("a", "aa"), ("c", "cc")) Returns: (DataFrame): the DataFrame with renamed fields """ jdf = self._jPythonHelper.smvRenameField(self._jdf, smv_copy_array(self._sc, *namePairs)) return DataFrame(jdf, self._sql_ctx)
[docs] def smvUnpivot(self, *cols): """Unpivot the selected columns Given a set of records with value columns, turns the value columns into value rows. Args: cols (\*string): the names of the columns to unpivot Example: input DF: +----+---+---+---+ | id | X | Y | Z | +====+===+===+===+ | 1 | A | B | C | +----+---+---+---+ | 2 | D | E | F | +----+---+---+---+ | 3 | G | H | I | +----+---+---+---+ >>> df.smvUnpivot("X", "Y", "Z") output DF: +----+--------+-------+ | id | column | value | +====+========+=======+ | 1 | X | A | +----+--------+-------+ | 1 | Y | B | +----+--------+-------+ | 1 | Z | C | +----+--------+-------+ | ...| ... | ... | +----+--------+-------+ | 3 | Y | H | +----+--------+-------+ | 3 | Z | I | +----+--------+-------+ Returns: (DataFrame): the unpivoted DataFrame """ jdf = self._jDfHelper.smvUnpivot(_to_seq(cols)) return DataFrame(jdf, self._sql_ctx)
[docs] def smvUnpivotRegex(self, cols, colNameFn, indexColName): """Unpivot the selected columns using the specified regex Args: cols (\*string): the names of the columns to unpivot colNameFn (string): a regex representing the function to be applied when unpivoting indexColName (string): the name of the index column to be created Example: input DF: +----+-------+-------+-------+-------+ | id | A_1 | A_2 | B_1 | B_2 | +====+=======+=======+=======+=======+ | 1 | 1_a_1 | 1_a_2 | 1_b_1 | 1_b_2 | +----+-------+-------+-------+-------+ | 2 | 2_a_1 | 2_a_2 | 2_b_1 | 2_b_2 | +----+-------+-------+-------+-------+ >>> df.smvUnpivotRegex( ["A_1", "A_2", "B_1", "B_2"], "(.*)_(.*)", "index" ) output DF: +----+-------+-------+-------+ | id | index | A | B | +====+=======+=======+=======+ | 1 | 1 | 1_a_1 | 1_b_1 | +----+-------+-------+-------+ | 1 | 2 | 1_a_2 | 1_b_2 | +----+-------+-------+-------+ | 2 | 1 | 2_a_1 | 2_b_1 | +----+-------+-------+-------+ | 2 | 2 | 2_a_2 | 2_b_2 | +----+-------+-------+-------+ Returns: (DataFrame): the unpivoted DataFrame """ jdf = self._jDfHelper.smvUnpivotRegex(_to_seq(cols), colNameFn, indexColName) return DataFrame(jdf, self._sql_ctx)
[docs] def smvExportCsv(self, path, n=None): """Export DataFrame to local file system Args: path (string): relative path to the app running directory on local file system (instead of HDFS) n (integer): optional. number of records to export. default is all records Note: Since we have to collect the DF and then call JAVA file operations, the job have to be launched as either local or yar-client mode. Also it is user's responsibility to make sure that the DF is small enough to fit into local file system. Example: >>> df.smvExportCsv("./target/python-test-export-csv.csv") Returns: (None) """ self._jDfHelper.smvExportCsv(path, n)
[docs] def smvOverlapCheck(self, keyColName): """For a set of DFs, which share the same key column, check the overlap across them The other DataFrames are specified in another set of parentheses, as follows: >>> df1.smvOverlapCheck(key)(*df) Args: keyColName (string): the column name for the key column Examples: >>> df1.smvOverlapCheck("key")(df2, df3, df4) output DF has 2 columns: - key - flag: a bit-string, e.g. 0110. Each bit represents whether the original DF has this key It can be used with EDD to summarize on the flag: >>> df1.smvOverlapCheck("key")(df2, df3).smvHist("flag") Returns: (DataFrame): the DataFrame with the key and flag columns """ def _check(*dfothers): jdf = self._jPythonHelper.smvOverlapCheck(self._jdf, keyColName, smv_copy_array(self._sc, *dfothers)) return DataFrame(jdf, self._sql_ctx) return _check
[docs] def smvDesc(self, *colDescs): """Adds column descriptions Args: colDescs (\*tuple): tuples of strings where the first is the column name, and the second is the description to add Example: >>> df.smvDesc(("a", "description of col a"), ("b", "description of col b")) Returns: (DataFrame): the DataFrame with column descriptions added """ return self._SchemaMetaOps.addDesc(*colDescs)
[docs] def smvDescFromDF(self, descDF): """Adds column descriptions Args: descDF (DataFrame): a companion 2-column desciptionDF that has variable names as column 1 and corresponding variable descriptions as column 2 Example: >>> df.smvDescFromDF(desciptionDF) Returns: (DataFrame): the DataFrame with column descriptions added """ desclist = [(str(r[0]), str(r[1])) for r in descDF.collect()] return self.smvDesc(*desclist)
[docs] def smvGetDesc(self, colName = None): """Returns column description(s) Args: colName (string): optional column name for which to get the description. Example: >>> df.smvGetDesc("col_a") >>> df.smvGetDesc() Returns: (string): description string of colName, if specified or: Returns: (list(tuple)): a list of (colName, description) pairs for all columns """ return self._SchemaMetaOps.getDesc(colName)
[docs] def smvRemoveDesc(self, *colNames): """Removes description for the given columns from the Dataframe Args: colNames (\*string): names of columns for which to remove the description Example: >>> df.smvRemoveDesc("col_a", "col_b") >>> df.smvRemoveDesc() Returns: (DataFrame): the DataFrame with column descriptions removed or: Returns: (DataFrame): the DataFrae with all column descriptions removed """ return self._SchemaMetaOps.removeDesc(*colNames)
[docs] def smvGetLabel(self, colName = None): """Returns a list of column label(s) Args: colName (string): optional column name for which to get the label. Example: >>> df.smvGetLabel("col_a") >>> df.smvGetLabel() Returns: (list(string)): a list of label strings of colName, if specified or: Returns: (list(tuple)): a list of (colName, list(labels)) pairs for all columns """ return self._SchemaMetaOps.getLabel(colName)
[docs] def smvLabel(self, colNames, labels): """Adds labels to the specified columns A column may have multiple labels. Adding the same label twice to a column has the same effect as adding that label once. For multiple colNames, the same set of labels will be added to all of them. When colNames is empty, the set of labels will be added to all columns of the df. labels parameters must be non-empty. Args: labels: (list(string)) a list of label strings to add colNames: (list(string)) list of names of columns for which to add labels Example: >>> df.smvLabel(["col_a", "col_b", "col_c"], ["tag_1", "tag_2"]) >>> df.smvLabel([], ["tag_1", "tag_2"]) Returns: (DataFrame): the DataFrame with labels added to the specified columns or: Returns: (DataFrame): the DataFrame with labels added to all columns """ return self._SchemaMetaOps.addLabel(colNames, labels)
[docs] def smvRemoveLabel(self, colNames = None, labels = None): """Removes labels from the specified columns For multiple colNames, the same set of labels will be removed from all of them. When colNames is empty, the set of labels will be removed from all columns of the df. When labels is empty, all labels will be removed from the given columns. If neither columns nor labels are specified, i.e. both parameter lists are empty, then all labels are removed from all columns in the data frame, essentially clearing the label meta data. Args: labels: (list(string)) a list of label strings to remove colNames: (list(string)) list of names of columns for which to remove labels Example: >>> df.smvRemoveLabel(["col_a"], ["tag_1"]) >>> df.smvRemoveLabel() Returns: (DataFrame): the DataFrame with specified labels removed from the specified columns or: Returns: (DataFrame): the DataFrame with all label meta data cleared """ return self._SchemaMetaOps.removeLabel(colNames, labels)
[docs] def smvWithLabel(self, labels = None): """Returns all column names in the data frame that contain all the specified labels If the labels is empty, returns all unlabeled columns in the data frame. Will throw if there are no columns that satisfy the condition. Args: labels: (list(string)) a list of label strings for the columns to match Example: >>> df.smvWithLabel(["tag_1", "tag_2"]) Returns: (list(string)): a list of column name strings that match the specified labels """ return self._SchemaMetaOps.colsWithLabel(labels)
[docs] def selectByLabel(self, labels = None): """Select columns whose metadata contains the specified labels If the labels is empty, returns a DataFrame with all the unlabeled columns. Will throw if there are no columns that satisfy the condition. Args: labels: (list(string)) a list of label strings for the columns to match Example: >>> df.selectByLabel(["tag_1"]) Returns: (DataFrame): the DataFrame with the selected columns """ return self.df.select(self.smvWithLabel(labels))
############################################# # DfHelpers which print to STDOUT # Scala side which print to STDOUT will not work on Jupyter. Have to pass the string to python side then print to stdout ############################################# def _println(self, string): sys.stdout.write(string + "\n") def _printFile(self, f, str): tgt = open(f, "w") tgt.write(str + "\n") tgt.close() def _peekStr(self, pos = 1, colRegex = ".*"): return self._jPythonHelper.peekStr(self._jdf, pos, colRegex)
[docs] def peek(self, pos = 1, colRegex = ".*"): """Display a DataFrame row in transposed view Args: pos (integer): the n-th row to display, default as 1 colRegex (string): show the columns with name matching the regex, default as ".*" Returns: (None) """ self._println(self._peekStr(pos, colRegex))
[docs] def peekSave(self, path, pos = 1, colRegex = ".*"): """Write `peek` result to a file Args: path (string): local file name to Write pos (integer): the n-th row to display, default as 1 colRegex (string): show the columns with name matching the regex, default as ".*" Returns: (None) """ self._printFile(path, self._peekStr(pos, colRegex))
def _smvEdd(self, *cols): return self._jDfHelper._smvEdd(_to_seq(cols)).createReport()
[docs] def smvEdd(self, *cols): """Display EDD summary Args: cols (\*string): column names on which to perform EDD summary Example: >>> df.smvEdd("a", "b") Returns: (None) """ self._println(self._smvEdd(*cols))
def _smvHist(self, *cols): return self._jDfHelper._smvHist(_to_seq(cols)).createReport()
[docs] def smvHist(self, *cols): """Display EDD histogram Each column's histogram prints separately Args: cols (\*string): The columns on which to perform EDD histogram Example: >>> df.smvHist("a") Returns: (None) """ self._println(self._smvHist(*cols))
def _smvConcatHist(self, *cols): return self._jPythonHelper.smvConcatHist(self._jdf, smv_copy_array(self._sc, *cols)).createReport()
[docs] def smvConcatHist(self, *cols): """Display EDD histogram of a group of columns (joint distribution) Args: cols (\*string): The columns on which to perform EDD histogram Example: >>> df.smvConcatHist("a", "b") Returns: (None) """ self._println(self._smvConcatHist(*cols))
def _smvFreqHist(self, *cols): return self._jDfHelper._smvFreqHist(_to_seq(cols)).createReport()
[docs] def smvFreqHist(self, *cols): """Print EDD histogram with frequency sorting Args: cols (\*string): The columns on which to perform EDD histogram Example: >>> df.smvFreqHist("a") Returns: (None) """ self._println(self._smvFreqHist(*cols))
def _smvCountHist(self, keys, binSize): if is_string(keys): res = self._jDfHelper._smvCountHist(_to_seq([keys]), binSize) else: res = self._jDfHelper._smvCountHist(_to_seq(keys), binSize) return res.createReport()
[docs] def smvCountHist(self, keys, binSize): """Print the distribution of the value frequency on specific columns Args: keys (list(string)): the column names on which the EDD histogram is performed binSize (integer): the bin size for the histogram Example: >>> df.smvCountHist(["k"], 1) Returns: (None) """ self._println(self._smvCountHist(keys, binSize))
def _smvBinHist(self, *colWithBin): for elem in colWithBin: assert type(elem) is tuple, "smvBinHist takes a list of tuple(string, double) as paraeter" assert len(elem) == 2, "smvBinHist takes a list of tuple(string, double) as parameter" insureDouble = map(lambda t: (t[0], t[1] * 1.0), colWithBin) return self._jPythonHelper.smvBinHist(self._jdf, smv_copy_array(self._sc, *insureDouble)).createReport()
[docs] def smvBinHist(self, *colWithBin): """Print distributions on numerical columns with applying the specified bin size Args: colWithBin (\*tuple): each tuple must be of size 2, where the first element is the column name, and the second is the bin size for that column Example: >>> df.smvBinHist(("col_a", 1)) Returns: (None) """ self._println(self._smvBinHist(*colWithBin))
def _smvEddCompare(self, df2, ignoreColName): return self._jDfHelper._smvEddCompare(df2._jdf, ignoreColName)
[docs] def smvEddCompare(self, df2, ignoreColName): """Compare 2 DFs by comparing their Edd Summary result The result of the comparison is printed out. Args: df2 (DataFrame): the DataFrame to compare with ignoreColName (boolean): specifies whether to ignore column names, default is false Example: >>> df.smvEddCompare(df2) >>> df.smvEddCompare(df2, true) Returns: (None) """ self._println(self._smvEddCompare(df2, ignoreColName))
def _smvDiscoverPK(self, n): pk = self._jPythonHelper.smvDiscoverPK(self._jdf, n) return "[{}], {}".format(", ".join(map(str, pk._1())), pk._2())
[docs] def smvDiscoverPK(self, n=10000): """Find a column combination which uniquely identifies a row from the data The resulting output is printed out Note: The algorithm only looks for a set of keys which uniquely identifies the row. There could be more key combinations which can also be the primary key. Args: n (integer): number of rows the PK discovery algorithm will run on, defaults to 10000 Example: >>> df.smvDiscoverPK(5000) Returns: (None) """ self._println(self._smvDiscoverPK(n))
[docs] def smvDupeCheck(self, keys, n=10000): """For a given list of potential keys, check for duplicated records with the number of duplications and all the columns. Null values are allowed in the potential keys, so duplication on Null valued keys will also be reported. Args: keys (list(string)): the key column list which the duplicate check applied n (integer): number of rows from input data for checking duplications, defaults to 10000 Returns: (DataFrame): returns key columns + "_N" + the rest columns for the records with more key duplication records, where "_N" has the count of duplications of the key values of that record """ dfTopN = self.df.limit(n).cache() res = dfTopN.groupBy(*keys)\ .agg(F.count(F.lit(1)).alias('_N'))\ .where(F.col('_N') > 1)\ .smvJoinByKey(dfTopN, keys, 'inner', True)\ .orderBy(*keys) dfTopN.unpersist() return res
[docs] def smvDumpDF(self): """Dump the schema and data of given df to screen for debugging purposes Similar to `show()` method of DF from Spark 1.3, although the format is slightly different. This function's format is more convenient for us and hence has remained. Example: >>> df.smvDumpDF() Returns: (None) """ self._println(self._jDfHelper._smvDumpDF())
[docs]class ColumnHelper(object): def __init__(self, col): self.col = col self._jc = col._jc self._jvm = _sparkContext()._jvm self._jPythonHelper = self._jvm.SmvPythonHelper self._jColumnHelper = self._jvm.ColumnHelper(self._jc)
[docs] def smvGetColName(self): """Returns the name of a Column as a sting Example: >>> df.a.smvGetColName() Returns: (str) """ return self._jColumnHelper.getName()
[docs] def smvIsAllIn(self, *vals): """Returns true if ALL of the Array columns' elements are in the given parameter sequence Args: vals (\*any): vals must be of the same type as the Array content Example: input DF: +---+---+ | k | v | +===+===+ | a | b | +---+---+ | c | d | +---+---+ | | | +---+---+ >>> df.select(array(col("k"), col("v")).smvIsAllIn("a", "b", "c").alias("isFound")) output DF: +---------+ | isFound | +=========+ | true | +---------+ | false | +---------+ | false | +---------+ Returns: (Column): BooleanType """ jc = self._jPythonHelper.smvIsAllIn(self._jc, _to_seq(vals)) return Column(jc)
[docs] def smvIsAnyIn(self, *vals): """Returns true if ANY one of the Array columns' elements are in the given parameter sequence Args: vals (\*any): vals must be of the same type as the Array content Example: input DF: +---+---+ | k | v | +===+===+ | a | b | +---+---+ | c | d | +---+---+ | | | +---+---+ >>> df.select(array(col("k"), col("v")).smvIsAnyIn("a", "b", "c").alias("isFound")) output DF: +---------+ | isFound | +=========+ | true | +---------+ | true | +---------+ | false | +---------+ Returns: (Column): BooleanType """ jc = self._jPythonHelper.smvIsAnyIn(self._jc, _to_seq(vals)) return Column(jc)
[docs] def smvMonth(self): """Extract month component from a timestamp Example: >>> df.select(col("dob").smvMonth()) Returns: (Column): IntegerType. Month component as integer, or null if input column is null """ jc = self._jColumnHelper.smvMonth() return Column(jc)
[docs] def smvYear(self): """Extract year component from a timestamp Example: >>> df.select(col("dob").smvYear()) Returns: (Column): IntegerType. Year component as integer, or null if input column is null """ jc = self._jColumnHelper.smvYear() return Column(jc)
[docs] def smvQuarter(self): """Extract quarter component from a timestamp Example: >>> df.select(col("dob").smvQuarter()) Returns: (Column): IntegerType. Quarter component as integer (1-based), or null if input column is null """ jc = self._jColumnHelper.smvQuarter() return Column(jc)
[docs] def smvDayOfMonth(self): """Extract day of month component from a timestamp Example: >>> df.select(col("dob").smvDayOfMonth()) Returns: (Column): IntegerType. Day of month component as integer (range 1-31), or null if input column is null """ jc = self._jColumnHelper.smvDayOfMonth() return Column(jc)
[docs] def smvDayOfWeek(self): """Extract day of week component from a timestamp Example: >>> df.select(col("dob").smvDayOfWeek()) Returns: (Column): IntegerType. Day of week component as integer (range 1-7, 1 being Monday), or null if input column is null """ jc = self._jColumnHelper.smvDayOfWeek() return Column(jc)
[docs] def smvHour(self): """Extract hour component from a timestamp Example: >>> df.select(col("dob").smvHour()) Returns: (Column): IntegerType. Hour component as integer, or null if input column is null """ jc = self._jColumnHelper.smvHour() return Column(jc)
[docs] def smvPlusDays(self, delta): """Add N days to `Timestamp` or `Date` column Args: delta (int or Column): the number of days to add Example: >>> df.select(col("dob").smvPlusDays(3)) Returns: (Column): TimestampType. The incremented Timestamp, or null if input is null. **Note** even if the input is DateType, the output is TimestampType Please note that although Spark's `date_add` function does the similar thing, they are actually different. - Both can act on both `Timestamp` and `Date` types - `smvPlusDays` always returns `Timestamp`, while `F.date_add` always returns `Date` """ if (isinstance(delta, int)): jdelta = delta elif (isinstance(delta, Column)): jdelta = delta._jc else: raise RuntimeError("delta parameter must be either an int or a Column") jc = self._jColumnHelper.smvPlusDays(jdelta) return Column(jc)
[docs] def smvPlusWeeks(self, delta): """Add N weeks to `Timestamp` or `Date` column Args: delta (int or Column): the number of weeks to add Example: >>> df.select(col("dob").smvPlusWeeks(3)) Returns: (Column): TimestampType. The incremented Timestamp, or null if input is null. **Note** even if the input is DateType, the output is TimestampType """ if (isinstance(delta, int)): jdelta = delta elif (isinstance(delta, Column)): jdelta = delta._jc else: raise RuntimeError("delta parameter must be either an int or a Column") jc = self._jColumnHelper.smvPlusWeeks(jdelta) return Column(jc)
[docs] def smvPlusMonths(self, delta): """Add N months to `Timestamp` or `Date` column Args: delta (int or Column): the number of months to add Note: The calculation will do its best to only change the month field retaining the same day of month. However, in certain circumstances, it may be necessary to alter smaller fields. For example, 2007-03-31 plus one month cannot result in 2007-04-31, so the day of month is adjusted to 2007-04-30. Example: >>> df.select(col("dob").smvPlusMonths(3)) Returns: (Column): TimestampType. The incremented Timestamp, or null if input is null. **Note** even if the input is DateType, the output is TimestampType Please note that although Spark's `add_months` function does the similar thing, they are actually different. - Both can act on both `Timestamp` and `Date` types - `smvPlusMonths` always returns `Timestamp`, while `F.add_months` always returns `Date` """ if (isinstance(delta, int)): jdelta = delta elif (isinstance(delta, Column)): jdelta = delta._jc else: raise RuntimeError("delta parameter must be either an int or a Column") jc = self._jColumnHelper.smvPlusMonths(jdelta) return Column(jc)
[docs] def smvPlusYears(self, delta): """Add N years to `Timestamp` or `Date` column Args: delta (int or Column): the number of years to add Example: >>> df.select(col("dob").smvPlusYears(3)) Returns: (Column): TimestampType. The incremented Timestamp, or null if input is null. **Note** even if the input is DateType, the output is TimestampType """ if (isinstance(delta, int)): jdelta = delta elif (isinstance(delta, Column)): jdelta = delta._jc else: raise RuntimeError("delta parameter must be either an int or a Column") jc = self._jColumnHelper.smvPlusYears(jdelta) return Column(jc)
[docs] def smvStrToTimestamp(self, fmt): """Build a timestamp from a string Args: fmt (string): the format is the same as the Java `Date` format Example: >>> df.select(col("dob").smvStrToTimestamp("yyyy-MM-dd")) Returns: (Column): TimestampType. The converted Timestamp """ jc = self._jColumnHelper.smvStrToTimestamp(fmt) return Column(jc)
[docs] def smvTimestampToStr(self, timezone, fmt): """Build a string from a timestamp and timezone Args: timezone (string or Column): the timezone follows the rules in https://www.joda.org/joda-time/apidocs/org/joda/time/DateTimeZone.html#forID-java.lang.String- It can be a string like "America/Los_Angeles" or "+1000". If it is null, use current system time zone. fmt (string): the format is the same as the Java `Date` format Example: >>> df.select(col("ts").smvTimestampToStr("America/Los_Angeles","yyyy-MM-dd HH:mm:ss")) Returns: (Column): StringType. The converted String with given format """ if is_string(timezone): jtimezone = timezone elif isinstance(timezone, Column): jtimezone = timezone._jc else: raise RuntimeError("timezone parameter must be either an string or a Column") jc = self._jColumnHelper.smvTimestampToStr(jtimezone, fmt) return Column(jc)
[docs] def smvDay70(self): """Convert a Timestamp to the number of days from 1970-01-01 Example: >>> df.select(col("dob").smvDay70()) Returns: (Column): IntegerType. Number of days from 1970-01-01 (start from 0) """ jc = self._jColumnHelper.smvDay70() return Column(jc)
[docs] def smvMonth70(self): """Convert a Timestamp to the number of months from 1970-01-01 Example: >>> df.select(col("dob").smvMonth70()) Returns: (Column): IntegerType. Number of months from 1970-01-01 (start from 0) """ jc = self._jColumnHelper.smvMonth70() return Column(jc)
[docs] def smvTimeToType(self): """smvTime helper to convert `smvTime` column to time type string Example `smvTime` values (as String): "Q201301", "M201512", "D20141201" Example output type "quarter", "month", "day" """ jc = self._jColumnHelper.smvTimeToType() return Column(jc)
[docs] def smvTimeToIndex(self): """smvTime helper to convert `smvTime` column to time index integer Example `smvTime` values (as String): "Q201301", "M201512", "D20141201" Example output 172, 551, 16405 (# of quarters, months, and days from 19700101) """ jc = self._jColumnHelper.smvTimeToIndex() return Column(jc)
[docs] def smvTimeToLabel(self): """smvTime helper to convert `smvTime` column to human readable form Example `smvTime` values (as String): "Q201301", "M201512", "D20141201" Example output "2013-Q1", "2015-12", "2014-12-01" """ jc = self._jColumnHelper.smvTimeToLabel() return Column(jc)
[docs] def smvTimeToTimestamp(self): """smvTime helper to convert `smvTime` column to a timestamp at the beginning of the given time pireod. Example `smvTime` values (as String): "Q201301", "M201512", "D20141201" Example output "2013-01-01 00:00:00.0", "2015-12-01 00:00:00.0", "2014-12-01 00:00:00.0" """ jc = self._jColumnHelper.smvTimeToTimestamp() return Column(jc)
[docs] def smvArrayFlatten(self, elemType): """smvArrayFlatten helper applies flatten operation on an Array of Array column. Example: >>> df.select(col('arrayOfArrayOfStr').smvArrayFlatten(StringType())) Args: elemType (DataType or DataFram): array element's data type, in object form or the DataFrame to infer the element data type """ if(isinstance(elemType, DataType)): elemTypeJson = elemType.json() elif(isinstance(elemType, DataFrame)): elemTypeJson = elemType.select(self.col)\ .schema.fields[0].dataType.elementType.elementType.json() else: raise SmvRuntimeError("smvArrayFlatten does not support type: {}".format(type(elemType))) jc = self._jColumnHelper.smvArrayFlatten(elemTypeJson) return Column(jc)
# Initialize DataFrame and Column with helper methods. Called by SmvApp.
[docs]def init_helpers(): _helpCls(Column, ColumnHelper) _helpCls(DataFrame, DataFrameHelper)