Source code for smv.utils

#
# This file is licensed under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pyspark.sql.column import Column
from pyspark.sql import DataFrame

[docs]def for_name(name): """Dynamically load a class by its name. Equivalent to Java's Class.forName """ lastdot = name.rfind('.') if (lastdot == -1): return getattr(__import__('__main__'), name) mod = __import__(name[:lastdot]) for comp in name.split('.')[1:]: mod = getattr(mod, comp) return mod
[docs]def smv_copy_array(sc, *cols): """Copy Python list to appropriate Java array """ if (len(cols) == 0): # may need to pass the correct java type somehow return sc._gateway.new_array(sc._jvm.java.lang.String, 0) elem = cols[0] if (isinstance(elem, basestring)): jcols = sc._gateway.new_array(sc._jvm.java.lang.String, len(cols)) for i in range(0, len(jcols)): jcols[i] = cols[i] elif (isinstance(elem, Column)): jcols = sc._gateway.new_array(sc._jvm.org.apache.spark.sql.Column, len(cols)) for i in range(0, len(jcols)): jcols[i] = cols[i]._jc elif (isinstance(elem, DataFrame)): jcols = sc._gateway.new_array(sc._jvm.org.apache.spark.sql.DataFrame, len(cols)) for i in range(0, len(jcols)): jcols[i] = cols[i]._jdf elif (isinstance(elem, list)): # a list of list # use Java List as the outermost container; an Array[Array] # will not always work, because the inner list may be of # different lengths jcols = sc._jvm.java.util.ArrayList() for i in range(0, len(cols)): jcols.append(smv_copy_array(sc, *cols[i])) elif (isinstance(elem, tuple)): jcols = sc._jvm.java.util.ArrayList() for i in range(0, len(cols)): # Use Java List for tuple je = sc._jvm.java.util.ArrayList() for e in cols[i]: je.append(e) jcols.append(je) else: raise RuntimeError("Cannot copy array of type", type(elem)) return jcols
[docs]def check_socket(port): """Check whether the given port is open to bind""" import socket from contextlib import closing with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: try: sock.bind(('', port)) except: #Port is not open res = False else: #Port is open res = True return res