Source code for smv.smvmodulerunner

#
# This file is licensed under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import smv
from smv.modulesvisitor import ModulesVisitor
from smv.smviostrategy import SmvCsvPersistenceStrategy, SmvJsonOnHdfsPersistenceStrategy
from smv.smvmetadata import SmvMetaHistory
from smv.runinfo import SmvRunInfoCollector
from smv.utils import scala_seq_to_list, is_string
from smv.error import SmvRuntimeError, SmvMetadataValidationError

[docs]class SmvModuleRunner(object): """Represent the run-transaction. Provides the single entry point to run a group of modules """ def __init__(self, modules, smvApp, runMonitorCallback=None): self.roots = modules self.smvApp = smvApp self.log = smv.logger self.visitor = ModulesVisitor(modules) self.runMonitorCallback = runMonitorCallback
[docs] def run(self, forceRun=False): # a set of modules which need to run post_action, keep tracking # to make sure post_action run one and only one time for each TX # the set will be updated by _create_df, _create_meta and _force_post # and eventually be emptied out # See docs/dev/SmvGenericModule/SmvModuleRunner.md for details mods_to_run_post_action = set(self.visitor.modules_needed_for_run) # a map from fqn to already run DF, since the `run` interface of # SmvModule takes a map of class => df, the map here have to be # keyed by class method instead of `versioned_fqn`, which is only # in the resolved instance known = {} collector = SmvRunInfoCollector() # Do the real module calculation, when there are persistence, run # the post_actions and ancestor ephemeral modules post actions self._create_df(known, mods_to_run_post_action, collector, forceRun) # If there are ephemeral modules who has no persisting module # down stream, (must be part of roots), force an action and run # post actions self._force_post(mods_to_run_post_action, collector) dfs = [m.data for m in self.roots] return (dfs, collector)
[docs] def quick_run(self, forceRun=False): known = {} collector = SmvRunInfoCollector() self._create_df(known, set(), forceRun, is_quick_run=True) dfs = [m.data for m in self.roots] return (dfs, collector)
[docs] def get_runinfo(self): collector = SmvRunInfoCollector() def add_to_coll(m, _collector): hist = self.smvApp._read_meta_hist(m) _collector.add_runinfo(m.fqn(), m._get_metadata(), hist) self.visitor.dfs_visit(add_to_coll, collector, need_to_run_only=True) return collector
# TODO: All the publish* methods below should be removed when move to generic output module
[docs] def publish(self, publish_dir=None): # run before publish self.run() if (publish_dir is None): pubdir = self.smvApp.all_data_dirs().publishDir version = self.smvApp.all_data_dirs().publishVersion publish_dir = "{}/{}".format(pubdir, version) for m in self.roots: publish_base_path = "{}/{}".format(publish_dir, m.fqn()) publish_csv_path = publish_base_path + ".csv" publish_meta_path = publish_base_path + ".meta" publish_hist_path = publish_base_path + ".hist" SmvCsvPersistenceStrategy(m.smvApp, m.fqn(), publish_csv_path).write(m.data) SmvJsonOnHdfsPersistenceStrategy(m.smvApp, publish_meta_path).write(m.module_meta.toJson()) hist = self.smvApp._read_meta_hist(m) SmvJsonOnHdfsPersistenceStrategy(m.smvApp, publish_hist_path).write(hist.toJson())
[docs] def publish_to_hive(self): # run before publish self.run() for m in self.roots: m.exportToHive()
[docs] def publish_to_jdbc(self): self.run() for m in self.roots: m.publishThroughJDBC()
[docs] def publish_local(self, local_dir): self.run() for m in self.roots: csv_path = "{}/{}".format(local_dir, m.versioned_fqn) m.data.smvExportCsv(csv_path)
[docs] def purge_persisted(self): def cleaner(m, state): m.persistStrategy().remove() m.metaStrategy().remove() self.visitor.dfs_visit(cleaner, None)
def _create_df(self, known, need_post, collector, forceRun=False, is_quick_run=False): # run module and create df. when persisting, post_action # will run on current module and all upstream modules def runner(m, state): (fqn2df, run_set, collector) = state fqn = m.fqn() # tell monitor m is running if (self.runMonitorCallback is not None): self.runMonitorCallback({ 'fqn': fqn, 'status': 'started', 'applicationId': self.smvApp.sc.applicationId }) # Run module m._do_it(fqn2df, run_set, collector, forceRun, is_quick_run) # tell monitor m is done if (self.runMonitorCallback is not None): self.runMonitorCallback({ 'fqn': fqn, 'status': 'completed', 'needsToRun': m.needsToRun(), }) self.visitor.dfs_visit(runner, (known, need_post, collector), need_to_run_only=True) def _force_post(self, need_post, collector): # If there are still module left for post_action, force a run here # to run them and all left over on their upstream if (len(need_post) > 0): self.log.debug("leftover mods need to run post action: {}".format( [m.fqn() for m in need_post] )) def force_run(mod, state): (run_set, coll) = state mod._force_post_action(run_set, coll) # Note: we used bfs_visit here run downstream first # In case of A<-B<-C all need to run, this way will only # need to force action on C, and A and B's post action can # also be calculated self.visitor.bfs_visit(force_run, (need_post, collector), need_to_run_only=True)