Source code for smv.smvapp

#
# This file is licensed under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""SmvApp entry class
This module provides the main SMV Python entry point ``SmvPy`` class and a singleton `smvApp`.
It is equivalent to ``SmvApp`` on Scala side
"""
from datetime import datetime
import os
import sys
import re
import json
import pkgutil
from collections import namedtuple

from py4j.java_gateway import java_import, JavaObject
from pyspark.java_gateway import launch_gateway
from pyspark import SparkContext
from pyspark.sql import SparkSession, DataFrame


from smv.datasetmgr import DataSetMgr
from smv.smvappinfo import SmvAppInfo
from smv.datasetrepo import DataSetRepoFactory, DataSetRepo
from smv.utils import smv_copy_array, check_socket, scala_seq_to_list
from smv.error import SmvRuntimeError, SmvDqmValidationError
import smv.helpers
from smv.runinfo import SmvRunInfoCollector
from smv.modulesvisitor import ModulesVisitor
from smv.smvmodulerunner import SmvModuleRunner
from smv.smvconfig import SmvConfig
from smv.smviostrategy import SmvJsonOnHdfsPersistenceStrategy
from smv.conn import SmvHdfsConnectionInfo
from smv.smvmetadata import SmvMetaHistory
from smv.smvhdfs import SmvHDFS
from py4j.protocol import Py4JJavaError



[docs]class SmvApp(object): """The Python representation of SMV. Its singleton instance is created later in the containing module and is named smvApp Adds `java_imports` to the namespace in the JVM gateway in SparkContext (in pyspark). It also creates an instance of SmvPyClient. """ # Singleton instance of SmvApp _instance = None # default rel path for python sources from appDir SRC_PROJECT_PATH = "src/main/python" # default location for py UDL's in smv projects SRC_LIB_PATH = "library" # prefix which under SRC_LIB_PATH but should be excluded from userlib SEMI_PRIVATE_LIB_PREFIX = "dsalib"
[docs] @classmethod def getInstance(cls): if cls._instance is None: raise SmvRuntimeError("An instance of SmvApp has not been created") else: return cls._instance
[docs] @classmethod def createInstance(cls, arglist, _sparkSession, py_module_hotload=True): """Create singleton instance. Also returns the instance. """ cls._instance = cls(arglist, _sparkSession, py_module_hotload) return cls._instance
[docs] @classmethod def setInstance(cls, app): """Set the singleton instance. """ cls._instance = app
def __init__(self, arglist, _sparkSession, py_module_hotload=True): self.smvHome = os.environ.get("SMV_HOME") if (self.smvHome is None): raise SmvRuntimeError("SMV_HOME env variable not set!") self.sparkSession = _sparkSession if (self.sparkSession is not None): sc = self.sparkSession.sparkContext sc.setLogLevel("ERROR") self.sc = sc self.sqlContext = self.sparkSession._wrapped self._jvm = sc._jvm self.j_smvPyClient = self._jvm.org.tresamigos.smv.python.SmvPyClientFactory.init(self.sparkSession._jsparkSession) self.j_smvApp = self.j_smvPyClient.j_smvApp() else: _gw = launch_gateway(None) self._jvm = _gw.jvm self.py_module_hotload = py_module_hotload java_import(self._jvm, "org.tresamigos.smv.ColumnHelper") java_import(self._jvm, "org.tresamigos.smv.SmvDFHelper") java_import(self._jvm, "org.tresamigos.smv.dqm.*") java_import(self._jvm, "org.tresamigos.smv.panel.*") java_import(self._jvm, "org.tresamigos.smv.python.SmvPythonHelper") java_import(self._jvm, "org.tresamigos.smv.SmvHDFS") java_import(self._jvm, "org.tresamigos.smv.DfCreator") self.smvSchemaObj = self._jvm.SmvPythonHelper.getSmvSchema() self.py_smvconf = SmvConfig(arglist, self._jvm) # configure spark sql params if (self.sparkSession is not None): for k, v in self.py_smvconf.spark_sql_props().items(): self.sqlContext.setConf(k, v) # issue #429 set application name from smv config if (self.sparkSession is not None): sc._conf.setAppName(self.appName()) # CmdLine is static, so can be an attribute cl = self.py_smvconf.cmdline self.cmd_line = namedtuple("CmdLine", cl.keys())(*cl.values()) # shortcut is meant for internal use only self.dsm = DataSetMgr(self._jvm, self.py_smvconf) # computed df cache, keyed by m.versioned_fqn self.data_cache = {} # AFTER app is available but BEFORE stages, # use the dynamically configured app dir to set the source path, library path self.prependDefaultDirs() self.repoFactory = DataSetRepoFactory(self) self.dsm.register(self.repoFactory) # provider cache, keyed by providers' fqn self.provider_cache = {} self.refresh_provider_cache() # Initialize DataFrame and Column with helper methods smv.helpers.init_helpers()
[docs] def smvVersion(self): versionFile = self.smvHome + "/.smv_version" with open(versionFile, "r") as fp: line = fp.readline() return line.strip()
[docs] def exception_handling(func): """ Decorator function to catch Py4JJavaError and raise SmvDqmValidationError if any. Otherwise just pass through the original exception """ def func_wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Py4JJavaError as e: if (e.java_exception and e.java_exception.getClass().getName() == "org.tresamigos.smv.dqm.SmvDqmValidationError"): dqmValidationResult = json.loads(e.java_exception.getMessage()) raise SmvDqmValidationError(dqmValidationResult) else: raise e return func_wrapper
[docs] def prependDefaultDirs(self): """ Ensure that mods in src/main/python and library/ are discoverable. If we add more default dirs, we'll make this a set """ self.prepend_source(self.SRC_LIB_PATH) self.prepend_source(self.SRC_PROJECT_PATH)
[docs] def removeDefaultDirs(self): """ The cleanup version of prependDefaultDirs """ self.remove_source(self.SRC_PROJECT_PATH) self.remove_source(self.SRC_LIB_PATH)
[docs] def all_data_dirs(self): """ All the config data dirs as an object. Could be dynamic, so calculate each time when use """ dds = self.py_smvconf.all_data_dirs() return namedtuple("DataDirs", dds.keys())(*dds.values())
[docs] def appName(self): return self.py_smvconf.app_name()
[docs] def appDir(self): return self.py_smvconf.app_dir
[docs] def jdbcUrl(self): res = self.py_smvconf.merged_props().get('smv.jdbc.url') if (res is None): raise SmvRuntimeError("JDBC url not specified in SMV config") return res
[docs] def jdbcDriver(self): res = self.py_smvconf.merged_props().get('smv.jdbc.driver') if (res is None): raise SmvRuntimeError("JDBC driver is not specified in SMV config") return res
[docs] def getConf(self, key): return self.py_smvconf.get_run_config(key)
[docs] def setAppDir(self, appDir): """ SMV's equivalent of 'cd' for app dirs. """ self.removeDefaultDirs() # this call sets the scala side's picture of app dir and forces # the app properties to be read from disk and reevaluated self.py_smvconf.set_app_dir(appDir) smv.logger.info("Set app dir to " + appDir) smv.logger.debug("Current SMV configuration: {}".format(self.py_smvconf.merged_props())) # this call will use the dynamic appDir that we just set ^ # to change sys.path, allowing py modules, UDL's to be discovered by python self.prependDefaultDirs() # As switch app dir, need to re-discover providers self.refresh_provider_cache()
[docs] def setDynamicRunConfig(self, runConfig): self.py_smvconf.set_dynamic_props(runConfig)
[docs] def getCurrentProperties(self): """ Python dict of current megred props defaultProps ++ appConfProps ++ homeConfProps ++ usrConfProps ++ cmdLineProps ++ dynamicRunConfig Where right wins out in map merge. """ return self.py_smvconf.merged_props()
[docs] def stages(self): """Stages is a function as they can be set dynamically on an SmvApp instance""" return self.py_smvconf.stage_names()
def _libsInDir(self, lib_dir, prefix): """Use introspection to determine list of availabe libs in a given directory.""" lib_walker = pkgutil.walk_packages([lib_dir], prefix) lib_names = [name for (_, name, is_pkg) in lib_walker if not is_pkg] return lib_names
[docs] def userLibs(self): """Use introspection to determine list of availabe user libs.""" lib_dir = os.path.join(self.appDir(), self.SRC_LIB_PATH) return [i for i in self._libsInDir(lib_dir, "") if not i.startswith(self.SEMI_PRIVATE_LIB_PREFIX)]
[docs] def semiLibs(self): """Use introspection to determine list of availabe semiprivate libs.""" lib_dir = os.path.join(self.appDir(), self.SRC_LIB_PATH, self.SEMI_PRIVATE_LIB_PREFIX) return self._libsInDir(lib_dir, self.SEMI_PRIVATE_LIB_PREFIX + ".")
[docs] def smvLibs(self): """Use introspection to determine list of availabe smv builtin libs.""" # Note: since sys.path has the src/main/python, we need to prefix all discovered # objects by `smv.` so that they are importable. lib_dir = os.path.join(self.smvHome, "src/main/python/smv") return self._libsInDir(lib_dir, "smv.")
[docs] def refresh_provider_cache(self): """Re-discover providers and set provider_cache""" self.provider_cache = DataSetRepo(self)._all_providers()
[docs] def get_providers_by_prefix(self, fqn_prefix): """Discover all providers in user lib and smv lib that have an provider type fqn starting with the given prefix. """ providers = self.provider_cache return {fqn:p for (fqn, p) in providers.items() if fqn.startswith(fqn_prefix)}
[docs] def get_provider_by_fqn(self, fqn): """Return provider class from provider name fqn """ providers = self.provider_cache return providers.get(fqn)
[docs] def get_connection_by_name(self, name): """Get connection instance from name """ props = self.py_smvconf.merged_props() type_name = "smv.conn.{}.type".format(name) if (type_name in props): con_type = props.get(type_name) provider_fqn = "conn.{}".format(con_type) ConnClass = self.get_provider_by_fqn(provider_fqn) return ConnClass(name, props) else: raise SmvRuntimeError("Connection name {} is not configured with a type".format(name))
[docs] def get_all_connection_names(self): """Get all connetions defined in the app, return a list of names """ props = self.py_smvconf.merged_props() res = [] for k in props: s = re.search('smv\.conn\.(.*)\.type', k) if s: name = s.group(1) # matched in '(.*)' res.append(name) return res
[docs] def appId(self): return self.py_smvconf.app_id()
# TODO: deprecate method below. Users should use SmvSchema.discover() instead.
[docs] def discoverSchemaAsSmvSchema(self, path, csvAttributes, n=100000): """Discovers the schema of a .csv file and returns a Scala SmvSchema instance path --- path to csvfile n --- number of records used to discover schema (optional) csvAttributes --- Scala CsvAttributes instance (optional) """ return self._jvm.SmvPythonHelper.discoverSchemaAsSmvSchema(path, n, csvAttributes)
[docs] def getSchemaByDataFileAsSmvSchema(self, data_file_name, path=None): """Get the schema of a data file from its path and returns a Scala SmvSchema instance. The result will be None if the corresponding schema file does not exist or is invalid. """ data_file_path = os.path.join(self.inputDir() if path == None else path, data_file_name) return self.j_smvPyClient.readSchemaFromDataPathAsSmvSchema(data_file_path)
[docs] def inputDir(self): return self.all_data_dirs().inputDir
[docs] def input_connection(self): return SmvHdfsConnectionInfo( "inputdir", {"smv.conn.inputdir.path": self.inputDir()} )
[docs] def getFileNamesByType(self, ftype): """Return a list of file names which has the postfix ftype """ all_files = self._jvm.SmvPythonHelper.getDirList(self.inputDir()) return [str(f) for f in all_files if f.endswith(ftype)]
[docs] def get_graph_json(self): """Generate a json string representing the dependency graph. """ return SmvAppInfo(self).create_graph_json()
[docs] def get_module_state_json(self, fqns): """Generate a json string for modules' needToRun state of the app Args: fqns (list(string)): module fqn list to get state for Return: (string): json string. E.g. {"stage.mymod": {"needsToRun" : True}} """ return SmvAppInfo(self).create_module_state_json(fqns)
[docs] def getModuleResult(self, fqn, forceRun=False): """Run module and get its result, which may not be a DataFrame """ df, collector = self.runModule(fqn, forceRun) return df
[docs] def load_single_ds(self, fqn): """Return ds from fqn""" return self.dsm.load(fqn)[0]
def _to_single_run_res(self, res): """run and quick_run of SmvModuleRunner, returns (list(DF), SmvRunInfoCollector) for a single module, convert to (DF, SmvRunInfoCollector)""" (dfs, coll) = res return (dfs[0], coll)
[docs] @exception_handling def runModule(self, fqn, forceRun=False, quickRun=False, runMonitorCallback=None): """Runs SmvModule by its Fully Qualified Name(fqn) Args: fqn (str): The FQN of a module forceRun (bool): True if the module should be forced to run even if it has persisted output. False otherwise. quickRun (bool): skip computing dqm+metadata and persisting csv runMonitorCallback: when is not quick run, runner will call this with ({fqn, statues}) Example: To get just the dataframe of the module: dataframe = smvApp.runModule('package.module.SmvModuleClass')[0] To get both the dataframe and the run info collector: dataframe, collector = smvApp.runModule('package.module.SmvModuleClass') Returns: (DataFrame, SmvRunInfoCollector) tuple - DataFrame is the computed result of the module - SmvRunInfoCollector contains additional information about the run, such as validation results. """ ds = self.dsm.load(fqn)[0] if (quickRun): return self._to_single_run_res(SmvModuleRunner([ds], self).quick_run(forceRun)) else: return self._to_single_run_res(SmvModuleRunner([ds], self, runMonitorCallback).run(forceRun))
[docs] @exception_handling def quickRunModule(self, fqn): return self.runModule(fqn, forceRun=False, quickRun=True)
[docs] @exception_handling def runModuleByName(self, name, forceRun=False, quickRun=False, runMonitorCallback=None): """Runs a SmvModule by its name (can be partial FQN) See the `runModule` method above Args: name (str): The unique name of a module. Does not have to be the FQN. forceRun (bool): True if the module should be forced to run even if it has persisted output. False otherwise. quickRun (bool): skip computing dqm+metadata and persisting csv runMonitorCallback: if is not quick run, runner will call this with ({fqn, statues}) Returns: (DataFrame, SmvRunInfoCollector) tuple - DataFrame is the computed result of the module - SmvRunInfoCollector contains additional information about the run, such as validation results. """ fqn = self.dsm.inferFqn(name) return self.runModule(fqn, forceRun, quickRun, runMonitorCallback)
[docs] def get_need_to_run(self, roots, keep_roots=False): """Given a list of target modules to run, return a list of modules which will run and be persisted in the order of how they should run. This is a sub-set of modules_needed_for_run, but only keep the non-ephemeral and not-persisted-yet modules. Please note that some of the roots may not be in this list, to include all roots, set `keep_roots` to True """ visitor = ModulesVisitor(roots) return [m for m in visitor.modules_needed_for_run if ((not m._is_persisted() and not m.isEphemeral()) or (keep_roots and m in roots)) ]
[docs] def getRunInfo(self, fqn): """Returns the run information of a module and all its dependencies from the last run. Unlike the runModule() method, which returns the run information just for that run, this method returns the run information from the last run. If no module was run (e.g. the code did not change, so the data is read from persistent storage), the SmRunInfoCollector returned from the runModule() method would be empty. But the SmvRunInfoCollector returned from this method would contain all latest run information about all dependent modules. Args: fqn (str): fqn of target module runConfig (dict): runConfig to apply when collecting info. If module was run with a config, the same config needs to be specified here to retrieve the info. Returns: SmvRunInfoCollector """ ds = self.dsm.load(fqn)[0] return SmvModuleRunner([ds], self).get_runinfo()
[docs] def getRunInfoByPartialName(self, name): """Returns the run information of a module and all its dependencies from the last run. Unlike the runModule() method, which returns the run information just for that run, this method returns the run information from the last run. If no module was run (e.g. the code did not change, so the data is read from persistent storage), the SmRunInfoCollector returned from the runModule() method would be empty. But the SmvRunInfoCollector returned from this method would contain all latest run information about all dependent modules. Args: name (str): unique suffix to fqn of target module Returns: SmvRunInfoCollector """ fqn = self.dsm.inferFqn(name) return self.getRunInfo(fqn)
[docs] @exception_handling def publishModuleToHiveByName(self, name): """Publish an SmvModule to Hive by its name (can be partial FQN) """ fqn = self.dsm.inferFqn(name) ds = self.load_single_ds(fqn) return SmvModuleRunner([ds], self).publish_to_hive()
[docs] def getMetadataJson(self, fqn): """Returns the metadata for a given fqn""" ds = self.load_single_ds(fqn) return ds._get_metadata().toJson()
[docs] def getMetadataHistoryJson(self, fqn): """Returns the metadata history for a given fqn""" ds = self.load_single_ds(fqn) return self._read_meta_hist(ds).toJson()
[docs] def getDsHash(self, name): """The current hashOfHash for the named module as a hex string Args: name (str): The uniquen name of a module. Does not have to be the FQN. runConfig (dict): runConfig to apply when collecting info. If module was run with a config, the same config needs to be specified here to retrieve the correct hash. Returns: (str): The hashOfHash of the named module """ return self.dsm.inferDS(name)[0]._ver_hex()
[docs] def copyToHdfs(self, fileobj, destination): """Copies the content of a file object to an HDFS location. Args: fileobj (file object): a file-like object whose content is to be copied, such as one returned by open(), or StringIO destination (str): specifies the destination path in the hadoop file system The file object is expected to have been opened in binary read mode. The file object is closed when this function completes. """ SmvHDFS(self._jvm.SmvHDFS).writeToFile(fileobj, destination)
[docs] def getStageFromModuleFqn(self, fqn): """Returns the stage name for a given fqn""" res = [s for s in self.stages() if fqn.startswith(s + ".")] if (len(res) == 0): raise SmvRuntimeError("Can't find {} from stages {}".format(fqn, ", ".join(self.stages()))) return res[0]
[docs] def outputDir(self): return self.all_data_dirs().outputDir
[docs] def scalaOption(self, val): """Returns a Scala Option containing the value""" return self._jvm.scala.Option.apply(val)
[docs] def scalaNone(self): """Returns a Scala None value""" return self.scalaOption(None)
[docs] def createDFWithLogger(self, schema, data, readerLogger): return DataFrame(self._jvm.DfCreator.createDFWithLogger( self.sparkSession._jsparkSession, schema, data, readerLogger ), self.sqlContext)
[docs] def createDF(self, schema, data = None): readerLogger = self._jvm.SmvPythonHelper.getTerminateParserLogger() return self.createDFWithLogger(schema, data, readerLogger)
def _mkCsvAttr(self, delimiter=',', quotechar='"', hasHeader=False): """Factory method for creating instances of Scala case class CsvAttributes""" return self._jvm.org.tresamigos.smv.CsvAttributes(delimiter, quotechar, hasHeader)
[docs] def defaultCsvWithHeader(self): return self._mkCsvAttr(hasHeader=True)
[docs] def defaultTsv(self): return self._mkCsvAttr(delimiter='\t')
[docs] def defaultTsvWithHeader(self): return self._mkCsvAttr(delimiter='\t', hasHeader=True)
[docs] def abs_path_for_project_path(self, project_path): # Load dynamic app dir from scala return os.path.abspath(os.path.join(self.appDir(), project_path))
[docs] def prepend_source(self, project_path): abs_path = self.abs_path_for_project_path(project_path) # Source must be added to front of path to make sure it is found first sys.path.insert(1, abs_path) smv.logger.debug("Prepended {} to sys.path".format(abs_path))
[docs] def remove_source(self, project_path): try: abs_path = self.abs_path_for_project_path(project_path) sys.path.remove(abs_path) smv.logger.debug("Removed {} from sys.path".format(abs_path)) except ValueError: # ValueError will be raised if the project path was not previously # added to the sys.path pass
def _hist_io_strategy(self, m): """Meta history is managed by smvapp and smvmodulerunner, module instances does not need to know it""" hist_dir = self.all_data_dirs().historyDir hist_path = "{}/{}.hist".format(hist_dir, m.fqn()) return SmvJsonOnHdfsPersistenceStrategy(self, hist_path) def _read_meta_hist(self, m): io_strategy = self._hist_io_strategy(m) try: hist_json = io_strategy.read() hist = SmvMetaHistory().fromJson(hist_json) except: hist = SmvMetaHistory() return hist def _purge_current_output_files(self, mods): SmvModuleRunner(mods, self).purge_persisted() def _dry_run(self, mods): """Execute as dry-run if the dry-run flag is specified. This will show which modules are not yet persisted that need to run, without actually running the modules. """ mods_need_to_run = self.get_need_to_run(mods, keep_roots=True) print("Dry run - modules not persisted:") print("----------------------") print("\n".join([m.fqn() for m in mods_need_to_run])) print("----------------------") def _generate_dot_graph(self): """Genrate app level graphviz dot file """ dot_graph_str = SmvAppInfo(self).create_graph_dot() path = "{}.dot".format(self.appName()) with open(path, "w") as f: f.write(dot_graph_str) def _print_dead_modules(self): """Print dead modules: Modules which do not contribute to any output modules are considered dead """ SmvAppInfo(self).ls_dead() def _modules_to_run(self): modPartialNames = self.py_smvconf.mods_to_run stageNames = [self.py_smvconf.infer_stage_full_name(f) for f in self.py_smvconf.stages_to_run] return self.dsm.modulesToRun(modPartialNames, stageNames, self.cmd_line.runAllApp) def _publish_modules(self, mods): SmvModuleRunner(mods, self).publish() def _publish_modules_to_hive(self, mods): SmvModuleRunner(mods, self).publish_to_hive() def _publish_modules_through_jdbc(self, mods): SmvModuleRunner(mods, self).publish_to_jdbc() def _publish_modules_locally(self, mods): local_dir = self.cmd_line.exportCsv SmvModuleRunner(mods, self).publish_local(local_dir) def _generate_output_modules(self, mods): SmvModuleRunner(mods, self).run()
[docs] def run(self): mods = self._modules_to_run() if(self.cmd_line.forceRunAll): self._purge_current_output_files(mods) if (len(mods) > 0): print("Modules to run/publish") print("----------------------") print("\n".join([m.fqn() for m in mods])) print("----------------------") #either generate graphs, publish modules, or run output modules (only one will occur) if(self.cmd_line.printDeadModules): self._print_dead_modules() elif(self.cmd_line.dryRun): self._dry_run(mods) elif(self.cmd_line.graph): self._generate_dot_graph() elif(self.cmd_line.publishHive): self._publish_modules_to_hive(mods) elif(self.cmd_line.publish): self._publish_modules(mods) elif(self.cmd_line.publishJDBC): self._publish_modules_through_jdbc(mods) elif(self.cmd_line.exportCsv): self._publish_modules_locally(mods) else: self._generate_output_modules(mods)