Source code for smv.iomod.outputs
#
# This file is licensed under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
from smv.iomod.base import SmvSparkDfOutput, AsTable, AsFile
from smv.smviostrategy import SmvCsvOnHdfsIoStrategy, SmvJdbcIoStrategy, SmvHiveIoStrategy, SmvSchemaOnHdfsIoStrategy
from smv.csv_attributes import CsvAttributes
from smv.utils import scala_seq_to_list
class WithSparkDfWriter(object):
"""Mixin for output modules using spark df writer"""
def writeMode(self):
"""
Write mode for Spark DataFrameWriter.
Valid values:
- "append"
- "overwrite"
- "ignore"
- "error" or "errorifexists" (default)
"""
return "errorifexists"
[docs]class SmvJdbcOutputTable(SmvSparkDfOutput, WithSparkDfWriter, AsTable):
"""
User need to implement
- requiresDS
- connectionName
- tableName
- writeMode: optional, default "errorifexists"
"""
[docs] def connectionType(self):
return 'jdbc'
[docs] def doRun(self, known):
data = self.get_spark_df(known)
conn = self.get_connection()
SmvJdbcIoStrategy(self.smvApp, conn, self.tableName(), self.writeMode())\
.write(data)
# return data back for meta calculation
# TODO: need to review whether we should even calculate meta for output
return data
[docs]class SmvHiveOutputTable(SmvSparkDfOutput, WithSparkDfWriter, AsTable):
"""
User need to implement
- requiresDS
- connectionName
- tableName
- writeMode: optional, default "errorifexists"
"""
[docs] def connectionType(self):
return 'hive'
[docs] def doRun(self, known):
data = self.get_spark_df(known)
conn = self.get_connection()
SmvHiveIoStrategy(self.smvApp, conn, self.tableName(), self.writeMode())\
.write(data)
return data
[docs]class SmvCsvOutputFile(SmvSparkDfOutput, AsFile):
"""
User need to implement
- requiresDS
- connectionName
- fileName
"""
[docs] def writeMode(self):
"""Default write mode is overwrite, and currently only support overwrite
"""
return "overwrite"
[docs] def doRun(self, known):
data = self.get_spark_df(known)
file_path = os.path.join(self.get_connection().path, self.fileName())
schema_path = re.sub("\.csv$", ".schema", file_path)
schema = self.smvApp.smvSchemaObj.fromDataFrame(data._jdf, "_SmvStrNull_", self.smvApp.scalaOption(CsvAttributes()))
SmvCsvOnHdfsIoStrategy(self.smvApp, file_path, schema, None, self.writeMode()).write(data)
SmvSchemaOnHdfsIoStrategy(self.smvApp, schema_path, self.writeMode()).write(schema)
return data
__all__ = [
'SmvJdbcOutputTable',
'SmvHiveOutputTable',
'SmvCsvOutputFile',
]