Source code for smv.smvinput

# This file is licensed under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SMV DataSet Framework's SmvInputBase interface

This module defines the abstract classes which formed the SmvGenericModule's
input DS Framework for clients' projects
"""

import abc

from smv.iomod import SmvCsvInputFile, SmvMultiCsvInputFiles, SmvHiveInputTable, SmvCsvStringInputData
from smv.conn import SmvHiveConnectionInfo


[docs]class SmvCsvFile(SmvCsvInputFile): """Input from a file in CSV format Base class for CSV file input. User need to define path method. Example: >>> class MyCsvFile(SmvCsvFile): >>> def path(self): >>> return "path/relative/to/smvInputDir/file.csv" """
[docs] @abc.abstractmethod def path(self): """relative path to csv file"""
[docs] def fileName(self): return self.path()
[docs] def run(self, df): return df
[docs] def doRun(self, known): df = super(SmvCsvFile, self).doRun(known) return self.run(df)
[docs] def connectionName(self): return None
[docs] def get_connection(self): return self.smvApp.input_connection()
[docs]class SmvSqlCsvFile(SmvCsvFile): """Input from a file in CSV format and using a SQL query to access it """ # temporary table name tableName = "df"
[docs] def query(self): """Query used to extract data from the table which reads the CSV file Override this to specify your own query (optional). Default is equivalent to 'select * from ' + tableName. Returns: (str): query """ return "select * from " + self.tableName
[docs] def run(self, df): # temporarily register DataFrame of input CSV file as a table df.registerTempTable(self.tableName) # execute the table query res = self.smvApp.sqlContext.sql(self.query()) # drop the temporary table self.smvApp.sqlContext.sql("drop table " + self.tableName) return res
[docs]class SmvMultiCsvFiles(SmvMultiCsvInputFiles): """Raw input from multiple csv files sharing single schema Instead of a single input file, specify a data dir with files which share the same schema. """
[docs] def dirName(self): return self.dir()
[docs] def run(self, df): return df
[docs] def doRun(self, known): df = super(SmvMultiCsvFiles, self).doRun(known) return self.run(df)
[docs] def connectionName(self): return None
[docs] def get_connection(self): return self.smvApp.input_connection()
[docs] @abc.abstractmethod def dir(self): """Path to the directory containing the csv files and their schema Returns: (str): path """
[docs]class SmvHiveTable(SmvHiveInputTable): """Input from a Hive table This is for backward compatability. Will be deprecated. Please use iomod.SmvHiveInputTable instead. User need to implement: - tableName Custom query at reading is no more supported, please use downstream module to process data """
[docs] @abc.abstractmethod def tableName(self): """User-specified name Hive hive table to extract input from Override this to specify your own table name. Returns: (str): table name """ pass
[docs] def connectionName(self): # Since old hive interface does not support separate schema # specification, no need for a connection name return None
[docs] def get_connection(self): # Create and empty connection info so SmvHiveInputTable will # default to refer to the tableName without a schema name return SmvHiveConnectionInfo("hiveschema", {})
[docs]class SmvCsvStringData(SmvCsvStringInputData): pass
__all__ = [ 'SmvMultiCsvFiles', 'SmvCsvFile', 'SmvSqlCsvFile', 'SmvCsvStringData', 'SmvHiveTable' ]