Source code for smv.schema_meta_ops
#
# This file is licensed under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SMV Schema Meta Operations
Provides helper functions for SmvDesc and SmvLabel operations
"""
import json
from pyspark.sql import DataFrame
from smv.error import SmvRuntimeError
smv_label = "smvLabel"
smv_desc = "smvDesc"
def _getMetaDesc(meta):
return meta.get(smv_desc, u'')
def _getMetaLabels(meta):
return meta.get(smv_label, [])
def _setMetaDesc(meta, desc):
meta[smv_desc] = desc
return meta
def _removeMetaDesc(meta):
meta.pop(smv_desc, None)
return meta
def _setMetaLabel(meta, labels):
meta[smv_label] = list(set(_getMetaLabels(meta)) | set(labels))
return meta
def _removeMetaLabel(meta, labels):
meta[smv_label] = [] if not labels else list(set(_getMetaLabels(meta)) - set(labels))
return meta
[docs]class SchemaMetaOps(object):
def __init__(self, df):
self.df = df
self.jdf = df._jdf
self.fields = df.schema.fields
self._sql_ctx = df.sql_ctx
self._jPythonHelper = df._sc._jvm.SmvPythonHelper
def _getMetaByName(self, colName):
"""Returns the metadata of the first column that matches the column name
Will throw if there's no column matching the specified name
Args:
colName (string) the name of the column that is being looked for
Returns:
(dict) the metadata of the given column
"""
try:
meta = next(col.metadata for col in self.fields if col.name == colName)
except StopIteration:
raise SmvRuntimeError("column name {} not found".format(colName))
return meta
def _updateColMeta(self, colShouldUpdate, colUpdateMeta):
"""Returns a DataFrame with the specified meta data updated in specified columns
Args:
colShouldUpdate (function):
input - (dict) a DataFrame column
output - (bool) whether we should update the given column
colUpdateMeta (function):
input - (dict) a DataFrame column
output - (dict) the metadata that should be assigned to the given column
Returns:
(DataFrame) a new DataFrame containing the updated metadata
"""
cols = [col for col in self.fields if colShouldUpdate(col)]
colMeta = [(col.name, json.dumps(colUpdateMeta(col))) for col in cols]
jdf = self._jPythonHelper.smvColMeta(self.jdf, colMeta)
return DataFrame(jdf, self._sql_ctx)
def _checkColExistence(self, colNames):
"""Check if the given column names exist in the DataFrame
Will throw if some of the column names are not found
Args:
colNames (list(string)) a list of column names to check
"""
invalidCols = set(colNames) - set(self.df.columns)
if invalidCols:
raise SmvRuntimeError("{} does not have columns {}".format(self.df, ", ".join(invalidCols)))
[docs] def getDesc(self, colName):
"""Returns column description(s)
If colName is empty, returns descriptions for all columns
"""
if colName is None:
return [(col.name, _getMetaDesc(col.metadata)) for col in self.fields]
return _getMetaDesc(self._getMetaByName(colName))
[docs] def getLabel(self, colName):
"""Returns a list of column label(s)
If colName is empty, returns labels for all columns
"""
if colName is None:
return [(col.name, _getMetaLabels(col.metadata)) for col in self.fields]
return _getMetaLabels(self._getMetaByName(colName))
[docs] def addDesc(self, *colDescs):
"""Adds column descriptions
"""
if not colDescs:
raise SmvRuntimeError("must provide (name, description) pair to add")
self._checkColExistence([tup[0] for tup in colDescs])
addDict = dict(colDescs)
def colShouldUpdate(col):
return col.name in addDict
def colUpdateMeta(col):
return _setMetaDesc(col.metadata, addDict[col.name])
return self._updateColMeta(colShouldUpdate, colUpdateMeta)
[docs] def removeDesc(self, *colNames):
"""Removes description for the given columns from the Dataframe
If colNames are empty, removes descriptions of all columns
"""
if colNames:
self._checkColExistence(colNames)
removeSet = set(colNames)
def colShouldUpdate(col):
return not colNames or col.name in removeSet
def colUpdateMeta(col):
return _removeMetaDesc(col.metadata)
return self._updateColMeta(colShouldUpdate, colUpdateMeta)
[docs] def addLabel(self, colNames, labels):
"""Adds labels to the specified columns
If colNames are empty, adds the same set of labels to all columns
"""
if not labels:
raise SmvRuntimeError("must provide a list of labels to add")
if colNames:
self._checkColExistence(colNames)
addSet = set(colNames)
def colShouldUpdate(col):
return not colNames or col.name in addSet
def colUpdateMeta(col):
return _setMetaLabel(col.metadata, labels)
return self._updateColMeta(colShouldUpdate, colUpdateMeta)
[docs] def removeLabel(self, colNames = None, labels = None):
"""Removes labels from the specified columns
If colNames are empty, removes the same set of labels from all columns
If labels are empty, removes all labels from the given columns
If they are both empty, removes all labels from all columns
"""
if colNames:
self._checkColExistence(colNames)
removeSet = set(colNames)
def colShouldUpdate(col):
return not colNames or col.name in removeSet
def colUpdateMeta(col):
return _removeMetaLabel(col.metadata, labels)
return self._updateColMeta(colShouldUpdate, colUpdateMeta)
[docs] def colsWithLabel(self, labels = None):
"""Returns all column names in the data frame that contain all the specified labels
If labels are empty, returns names of unlabeled columns
"""
def metaLabelMatched(meta):
if labels:
# if labels are provided, match the column whose labels contain the given ones
return set(labels) <= set(_getMetaLabels(meta))
else:
# if labels are empty, match the column with no label
return not _getMetaLabels(meta)
ret = [col.name for col in self.fields if metaLabelMatched(col.metadata)]
if not ret:
if labels:
raise SmvRuntimeError("there are no columns labeled with {{{}}} in {}"\
.format(", ".join(labels), self.df))
else:
raise SmvRuntimeError("there are no unlabeled columns in {}"\
.format(self.df))
return ret