diff --git a/utilities/Hive_metastore_migration/src/import_into_datacatalog.py b/utilities/Hive_metastore_migration/src/import_into_datacatalog.py index 5ba7aa7..b9d7214 100644 --- a/utilities/Hive_metastore_migration/src/import_into_datacatalog.py +++ b/utilities/Hive_metastore_migration/src/import_into_datacatalog.py @@ -1,14 +1,34 @@ # Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# SPDX-License-Identifier: MIT-0 +# Licensed under the Amazon Software License (the "License"). You may not use +# this file except in compliance with the License. A copy of the License is +# located at +# +# http://aws.amazon.com/asl/ +# +# and in the "LICENSE" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. + from __future__ import print_function +import logging +import os + +from pyspark.sql.functions import lit, struct, array, col, concat + from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from hive_metastore_migration import * +logging.basicConfig() +logger = logging.getLogger(__name__) +logger.setLevel(getattr(logging, os.getenv('LOG_LEVEL', 'INFO'))) + + def transform_df_to_catalog_import_schema(sql_context, glue_context, df_databases, df_tables, df_partitions): df_databases_array = df_databases.select(df_databases['type'], array(df_databases['item']).alias('items')) df_tables_array = df_tables.select(df_tables['type'], df_tables['database'], @@ -40,8 +60,8 @@ def import_datacatalog(sql_context, glue_context, datacatalog_name, databases, t connection_options={'catalog.name': datacatalog_name, 'catalog.region': region}) -def metastore_full_migration(sc, sql_context, glue_context, connection, datacatalog_name, db_prefix, table_prefix - , region): +def metastore_full_migration(sc, sql_context, glue_context, connection, datacatalog_name, db_prefix, table_prefix, + region): # extract hive_metastore = HiveMetastore(connection, sql_context) hive_metastore.extract_metastore() @@ -50,17 +70,25 @@ def metastore_full_migration(sc, sql_context, glue_context, connection, datacata (databases, tables, partitions) = HiveMetastoreTransformer( sc, sql_context, db_prefix, table_prefix).transform(hive_metastore) - #load + # load import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region) -def metastore_import_from_s3(sql_context, glue_context,db_input_dir, tbl_input_dir, parts_input_dir, - datacatalog_name, region): +def metastore_import_from_s3(sql_context, glue_context, db_input_dir, tbl_input_dir, parts_input_dir, db_prefix, datacatalog_name, + region): # extract databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA) tables = sql_context.read.json(path=tbl_input_dir, schema=METASTORE_TABLE_SCHEMA) partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA) + + # Changes to Prefix on database + if db_prefix: + databases = databases.withColumn('item', struct(col('item.description'), col('item.locationUri'), concat(lit(db_prefix),col('item.name')).alias('name'), col('item.parameters'))) + tables = tables.withColumn("database",concat(lit(db_prefix),col('database')).alias('database')) + partitions = partitions.withColumn("database",concat(lit(db_prefix),col('database')).alias('database')) + partitions = partitions.withColumn('item', struct(col('item.creationTime'), col('item.creationTime'), concat(lit(db_prefix),col('item.namespaceName')).alias('namespaceName'), col('item.parameters'), col('item.storageDescriptor'), col('item.values'))) + # load import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region) @@ -71,21 +99,29 @@ def main(): from_s3 = 'from-s3' from_jdbc = 'from-jdbc' parser = argparse.ArgumentParser(prog=sys.argv[0]) - parser.add_argument('-m', '--mode', required=True, choices=[from_s3, from_jdbc], help='Choose to migrate metastore either from JDBC or from S3') - parser.add_argument('-c', '--connection-name', required=False, help='Glue Connection name for Hive metastore JDBC connection') - parser.add_argument('-R', '--region', required=False, help='AWS region of target Glue DataCatalog, default to "us-east-1"') - parser.add_argument('-d', '--database-prefix', required=False, help='Optional prefix for database names in Glue DataCatalog') - parser.add_argument('-t', '--table-prefix', required=False, help='Optional prefix for table name in Glue DataCatalog') - parser.add_argument('-D', '--database-input-path', required=False, help='An S3 path containing json files of metastore database entities') - parser.add_argument('-T', '--table-input-path', required=False, help='An S3 path containing json files of metastore table entities') - parser.add_argument('-P', '--partition-input-path', required=False, help='An S3 path containing json files of metastore partition entities') + parser.add_argument('-m', '--mode', required=True, choices=[from_s3, from_jdbc], + help='Choose to migrate metastore either from JDBC or from S3') + parser.add_argument('-c', '--connection-name', required=False, + help='Glue Connection name for Hive metastore JDBC connection') + parser.add_argument('-R', '--region', required=False, + help='AWS region of target Glue DataCatalog, default to "us-east-1"') + parser.add_argument('-d', '--database-prefix', required=False, + help='Optional prefix for database names in Glue DataCatalog') + parser.add_argument('-t', '--table-prefix', required=False, + help='Optional prefix for table name in Glue DataCatalog') + parser.add_argument('-D', '--database-input-path', required=False, + help='An S3 path containing json files of metastore database entities') + parser.add_argument('-T', '--table-input-path', required=False, + help='An S3 path containing json files of metastore table entities') + parser.add_argument('-P', '--partition-input-path', required=False, + help='An S3 path containing json files of metastore partition entities') options = get_options(parser, sys.argv) if options['mode'] == from_s3: validate_options_in_mode( options=options, mode=from_s3, required_options=['database_input_path', 'table_input_path', 'partition_input_path'], - not_allowed_options=['database_prefix', 'table_prefix'] + not_allowed_options=['table_prefix'] ) elif options['mode'] == from_jdbc: validate_options_in_mode( @@ -110,6 +146,7 @@ def main(): db_input_dir=options['database_input_path'], tbl_input_dir=options['table_input_path'], parts_input_dir=options['partition_input_path'], + db_prefix=options.get('database_prefix') or '', datacatalog_name='datacatalog', region=options.get('region') or 'us-east-1' ) @@ -126,5 +163,6 @@ def main(): region=options.get('region') or 'us-east-1' ) + if __name__ == '__main__': main()