Module create.database in plugin tabular v0.5.3
Author(s) | Markus Binsteiner (markus@frkl.io) |
Tags | tabular |
Python class | kiara_plugin.tabular.modules.db.CreateDatabaseModule |
Module configuration options
Configuration class: kiara_plugin.tabular.modules.db.CreateDatabaseModuleConfig
Name | Description | Type | Required? | Default |
---|---|---|---|---|
source_type | The value type of the source value. | string | true | null |
target_type | The value type of the target. | string | true | null |
constants | Value constants for this module. | object | false | null |
defaults | Value defaults for this module. | object | false | null |
include_source_metadata | Whether to include a table with metadata about the source files. | anyOf: [{'type': 'boolean'}, {'type': 'null'}] | false | null |
ignore_errors | Whether to ignore convert errors and omit the failed items. | boolean | false | false |
include_source_file_content | When including source metadata, whether to also include the original raw (string) content. | boolean | false | false |
merge_into_single_table | Whether to merge all csv files into a single table. | boolean | false | false |
Module source code
class CreateDatabaseModule(CreateFromModule):
_module_type_name = "create.database" _config_cls = CreateDatabaseModuleConfig
def create__database__from__file( self, source_value: Value, optional: ValueMap ) -> Any: """Create a database from a file.
Currently, only csv files are supported. """ import csv as py_csv
temp_f = tempfile.mkdtemp() db_path = os.path.join(temp_f, "db.sqlite")
def cleanup(): shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
file_item: KiaraFile = source_value.data if not file_item.file_name.endswith(".csv"): raise KiaraProcessingException( "Only csv files are supported (at the moment)." )
table_name = file_item.file_name_without_extension
table_name = table_name.replace("-", "_") table_name = table_name.replace(".", "_")
has_header = optional.get_value_data("first_row_is_header") if has_header is None: try: has_header = True with open(source_value.data.path, "rt") as csvfile: sniffer = py_csv.Sniffer() has_header = sniffer.has_header(csvfile.read(2048)) csvfile.seek(0) except Exception as e: # TODO: add this to the procss log log_message( "csv_sniffer.error", file=source_value.data.path, error=str(e), details="assuming csv file has header", )
try: create_sqlite_table_from_tabular_file( target_db_file=db_path, file_item=file_item, table_name=table_name, no_headers=not has_header, ) except Exception as e: if self.get_config_value("ignore_errors") is True or True: log_message("ignore.import_file", file=file_item.path, reason=str(e)) else: raise KiaraProcessingException(e)
include_raw_content_in_file_info: bool = self.get_config_value( "include_source_metadata" ) if include_raw_content_in_file_info: db = KiaraDatabase(db_file_path=db_path) db.create_if_not_exists() include_content: bool = self.get_config_value("include_source_file_content") db._unlock_db() included_files = {file_item.file_name: file_item} file_bundle = KiaraFileBundle.create_from_file_models( files=included_files, bundle_name=file_item.file_name ) insert_db_table_from_file_bundle( database=db, file_bundle=file_bundle, table_name="source_files_metadata", include_content=include_content, ) db._lock_db()
return db_path
def create__database__from__file_bundle( self, source_value: Value, job_log: JobLog ) -> Any: """Create a database from a file_bundle value.
Currently, only csv files are supported, files in the source file_bundle that have different extensions will be ignored.
Unless 'merge_into_single_table' is set to 'True' in the module configuration, each csv file will create one table in the resulting database. If this option is set, only a single table with all the values of all csv files will be created. For this to work, all csv files should follow the same schema.
"""
merge_into_single_table = self.get_config_value("merge_into_single_table") if merge_into_single_table: raise NotImplementedError("Not supported (yet).")
include_raw_content_in_file_info: Union[bool, None] = self.get_config_value( "include_source_metadata" )
temp_f = tempfile.mkdtemp() db_path = os.path.join(temp_f, "db.sqlite")
def cleanup(): shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
db = KiaraDatabase(db_file_path=db_path) db.create_if_not_exists()
# TODO: check whether/how to add indexes
bundle: KiaraFileBundle = source_value.data
table_names: List[str] = [] included_files: Dict[str, bool] = {} errors: Dict[str, Union[None, str]] = {} for rel_path in sorted(bundle.included_files.keys()):
if not rel_path.endswith(".csv"): job_log.add_log( f"Ignoring file (not csv): {rel_path}", log_level=logging.INFO ) included_files[rel_path] = False errors[rel_path] = "Not a csv file." continue
file_item = bundle.included_files[rel_path] table_name = find_free_id( stem=file_item.file_name_without_extension, current_ids=table_names ) try: table_names.append(table_name) create_sqlite_table_from_tabular_file( target_db_file=db_path, file_item=file_item, table_name=table_name ) included_files[rel_path] = True except Exception as e: included_files[rel_path] = False errors[rel_path] = KiaraException.get_root_details(e)
if self.get_config_value("ignore_errors") is True or True: log_message("ignore.import_file", file=rel_path, reason=str(e)) continue
raise KiaraProcessingException(e)
if include_raw_content_in_file_info in [None, True]: include_content: bool = self.get_config_value("include_source_file_content") db._unlock_db()
insert_db_table_from_file_bundle( database=db, file_bundle=source_value.data, table_name="source_files_metadata", include_content=include_content, included_files=included_files, errors=errors, ) db._lock_db()
return db_path
def create_optional_inputs( self, source_type: str, target_type ) -> Union[Mapping[str, Mapping[str, Any]], None]:
inputs = {} if source_type == "file": inputs["first_row_is_header"] = { "type": "boolean", "optional": True, "doc": "Whether the first row of the file is a header row. If not provided, kiara will try to auto-determine.", }
if target_type == "database" and source_type == "table":
inputs["table_name"] = { "type": "string", "doc": "The name of the table in the new database.", "default": "imported_table", }
return inputs
def create__database__from__tables( self, source_value: Value, optional: ValueMap ) -> Any: """Create a database value from a list of tables."""
from kiara_plugin.tabular.utils.tables import create_database_from_tables
tables: KiaraTables = source_value.data db = create_database_from_tables(tables=tables)
return db
def create__database__from__table( self, source_value: Value, optional: ValueMap ) -> Any: """Create a database value from a table."""
table_name = optional.get_value_data("table_name") if not table_name: table_name = DEFAULT_TABLE_NAME
table: KiaraTable = source_value.data arrow_table = table.arrow_table
column_map = None index_columns = None
sqlite_schema = create_sqlite_schema_data_from_arrow_table( table=arrow_table, index_columns=index_columns, column_map=column_map )
db = KiaraDatabase.create_in_temp_dir() db._unlock_db() engine = db.get_sqlalchemy_engine()
_table = sqlite_schema.create_table(table_name=table_name, engine=engine)
with engine.connect() as conn:
for batch in arrow_table.to_batches( max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE ): conn.execute(insert(_table), batch.to_pylist()) conn.commit()
db._lock_db() return db