Source code for smv.smvappinfo

#
# This file is licensed under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from smv.utils import scala_seq_to_list
import json

from smv.modulesvisitor import ModulesVisitor

[docs]class SmvAppInfo(object): """Provides SmvApp module list, dependency graph etc. for shell and plot. This class is mainly for CLI and GUI. The functions are not core to SmvApp. """ def __init__(self, smvApp): self.smvApp = smvApp self.dsm = smvApp.dsm self.stages = smvApp.stages() def _graph(self): """Build graph with nodes:list(SmvGenericModule) and edges:list((SmvGenericModule, SmvGenericModule)) """ nodes = self.dsm.allDataSets() edges = [] for ds in nodes: from_ds = ds.resolvedRequiresDS edges.extend([(n, ds) for n in from_ds if n in nodes]) return (nodes, edges) def _common_prefix(self, fqn_list): """Given a list of fqns, return the longest common prefix """ if not fqn_list: return '' parsed = [s.split(".") for s in fqn_list] # The algorithm depends on the ordering of list(str), so LCP # of min and max in the group is the LCP of the entire group s1 = min(parsed) s2 = max(parsed) for i, c in enumerate(s1): if c != s2[i]: return ".".join(s1[:i]) return ".".join(s1) def _base_name(self, ds): """Return DS's fqn with common prefix removed """ shared_prefix = self._common_prefix(self.stages) + "." fqn = ds.fqn() if (fqn.startswith(shared_prefix)): return fqn[len(shared_prefix):] else: return fqn
[docs] def create_graph_json(self): """Create dependency graph Json string Dependency graph does not have modules' state info """ (nodes, edges) = self._graph() def node_type(n): t = n.dsType() if (t == "Input"): return "file" else: return t[:1].lower() + t[1:] def node_dict(n): return { "fqn": n.fqn(), "type": node_type(n), "version": n.version(), "description": n.description() } def edge_pair(f, t): return [f.fqn(), t.fqn()] return json.dumps({ "nodes": [node_dict(n) for n in nodes], "edges": [edge_pair(p[0], p[1]) for p in edges] })
[docs] def create_module_state_json(self, fqns): """Create all modules needToRun state Json string """ nodes = self.dsm.load(*fqns) res = {} for m in nodes: res.update({m.fqn(): {'needsToRun': m.needsToRun()}}) return json.dumps(res)
[docs] def create_graph_dot(self): """Create graphviz dot graph string for the whole app """ (nodes, edges) = self._graph() clusters = {} for s in self.stages: n_in_s = [n for n in nodes if n.fqn().startswith(s + ".")] clusters.update({s: n_in_s}) def _node_str(ds): if (ds.dsType() == "Input"): return ' "{}" [shape=box, color="pink"]'.format(self._base_name(ds)) else: return ' "{}"'.format(self._base_name(ds)) def _link_str(f, t): return ' "{}" -> "{}" '.format(self._base_name(f), self._base_name(t)) def _cluster_str(i, s, ns): return ' subgraph cluster_{} {{\n'.format(i) \ + ' label="{}"\n'.format(s) \ + ' color="#e0e0e0"\n'\ + ' {}\n'.format("; ".join(['"{}"'.format(self._base_name(n)) for n in ns]))\ + ' }' all_nodes = "\n".join([ _node_str(n) for n in nodes ]) all_links = "\n".join([ _link_str(p[0], p[1]) for p in edges ]) all_clusters = "\n".join([ _cluster_str(i, s, ns) for i, (s, ns) in enumerate(clusters.items()) ]) return 'digraph G {\n' \ + ' rankdir="LR";\n'\ + ' node [style=filled,color="lightblue"]\n'\ + '{}\n{}\n{}\n'.format(all_nodes, all_clusters, all_links)\ + '}'
def _ds_ancestors(self, m): """return the module's ancestors as python list""" return [mod for mod in m._ancestor_and_me_visitor.queue if mod != m] def _dead_nodes(self): """return all the dead nodes""" nodes = self.dsm.allDataSets() outputs = [n for n in nodes if n.isSmvOutput()] in_flow = set([n for o in outputs for n in self._ds_ancestors(o)]) return [n for n in nodes if n not in in_flow and n not in outputs] def _list_dataset(self, dss, withprefix=False): """list ds names, return: list(str)""" prefix_map = { "Output": "O", "Input" : "I", "Module": "M", "Model": "m", "ModelExec": "x" } def _node_str(n): if (n.isSmvOutput()): t = "Output" else: t = n.dsType() bn = self._base_name(n) if(withprefix): return '({}) {}'.format(prefix_map[t], bn) else: return bn return [_node_str(n) for n in dss]
[docs] def ls_stage(self): """list all stage names""" return "\n".join(self.stages)
def _ls_in_stage(self, s, nodes, indentation=""): """list modules in a stage""" out_list = self._list_dataset([ n for n in nodes if n.fqn().startswith(s + ".") ], True) return "\n".join([indentation + ns for ns in out_list]) def _ls(self, stage, nodes): """For given nodes, list the node names under their stages""" if(stage is None): return "\n" + "\n".join([ "{}:\n".format(s) + self._ls_in_stage(s, nodes, " ") + "\n" for s in self.stages ]) else: return self._ls_in_stage(stage, nodes)
[docs] def ls(self, stage=None): """List all modules, under their stages""" return self._ls(stage, self.dsm.allDataSets())
[docs] def ls_dead(self, stage=None): """List all dead modules, under their stages""" return self._ls(stage, self._dead_nodes())
[docs] def ls_ancestors(self, mname): """List given module's ancestors, under their stages""" m = self.dsm.inferDS(mname)[0] return self._ls(None, self._ds_ancestors(m))
[docs] def ls_descendants(self, mname): """List given module's descendants, under their stages""" m = self.dsm.inferDS(mname)[0] nodes = self.dsm.allDataSets() descendants = [n for n in nodes if m.fqn() in [a.fqn() for a in self._ds_ancestors(n)]] return self._ls(None, descendants)