class CreateDatabaseModule(CreateFromModule):
_module_type_name = "create.database"
_config_cls = CreateDatabaseModuleConfig
def create__database__from__file(
self, source_value: Value, optional: ValueMap
) -> Any:
"""Create a database from a file.
Currently, only csv files are supported.
"""
import csv as py_csv
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
file_item: KiaraFile = source_value.data
if not file_item.file_name.endswith(".csv"):
raise KiaraProcessingException(
"Only csv files are supported (at the moment)."
)
table_name = file_item.file_name_without_extension
table_name = table_name.replace("-", "_")
table_name = table_name.replace(".", "_")
has_header = optional.get_value_data("first_row_is_header")
if has_header is None:
try:
has_header = True
with open(source_value.data.path, "rt") as csvfile:
sniffer = py_csv.Sniffer()
has_header = sniffer.has_header(csvfile.read(2048))
csvfile.seek(0)
except Exception as e:
# TODO: add this to the procss log
log_message(
"csv_sniffer.error",
file=source_value.data.path,
error=str(e),
details="assuming csv file has header",
)
try:
create_sqlite_table_from_tabular_file(
target_db_file=db_path,
file_item=file_item,
table_name=table_name,
no_headers=not has_header,
)
except Exception as e:
if self.get_config_value("ignore_errors") is True or True:
log_message("ignore.import_file", file=file_item.path, reason=str(e))
else:
raise KiaraProcessingException(e)
include_raw_content_in_file_info: bool = self.get_config_value(
"include_source_metadata"
)
if include_raw_content_in_file_info:
db = KiaraDatabase(db_file_path=db_path)
db.create_if_not_exists()
include_content: bool = self.get_config_value("include_source_file_content")
db._unlock_db()
included_files = {file_item.file_name: file_item}
file_bundle = KiaraFileBundle.create_from_file_models(
files=included_files, bundle_name=file_item.file_name
)
insert_db_table_from_file_bundle(
database=db,
file_bundle=file_bundle,
table_name="source_files_metadata",
include_content=include_content,
)
db._lock_db()
return db_path
def create__database__from__file_bundle(
self, source_value: Value, job_log: JobLog
) -> Any:
"""Create a database from a file_bundle value.
Currently, only csv files are supported, files in the source file_bundle that have different extensions will be ignored.
Unless 'merge_into_single_table' is set to 'True' in the module configuration, each csv file will create one table
in the resulting database. If this option is set, only a single table with all the values of all
csv files will be created. For this to work, all csv files should follow the same schema.
"""
merge_into_single_table = self.get_config_value("merge_into_single_table")
if merge_into_single_table:
raise NotImplementedError("Not supported (yet).")
include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
"include_source_metadata"
)
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
db = KiaraDatabase(db_file_path=db_path)
db.create_if_not_exists()
# TODO: check whether/how to add indexes
bundle: KiaraFileBundle = source_value.data
table_names: List[str] = []
included_files: Dict[str, bool] = {}
errors: Dict[str, Union[None, str]] = {}
for rel_path in sorted(bundle.included_files.keys()):
if not rel_path.endswith(".csv"):
job_log.add_log(
f"Ignoring file (not csv): {rel_path}", log_level=logging.INFO
)
included_files[rel_path] = False
errors[rel_path] = "Not a csv file."
continue
file_item = bundle.included_files[rel_path]
table_name = find_free_id(
stem=file_item.file_name_without_extension, current_ids=table_names
)
try:
table_names.append(table_name)
create_sqlite_table_from_tabular_file(
target_db_file=db_path, file_item=file_item, table_name=table_name
)
included_files[rel_path] = True
except Exception as e:
included_files[rel_path] = False
errors[rel_path] = KiaraException.get_root_details(e)
if self.get_config_value("ignore_errors") is True or True:
log_message("ignore.import_file", file=rel_path, reason=str(e))
continue
raise KiaraProcessingException(e)
if include_raw_content_in_file_info in [None, True]:
include_content: bool = self.get_config_value("include_source_file_content")
db._unlock_db()
insert_db_table_from_file_bundle(
database=db,
file_bundle=source_value.data,
table_name="source_files_metadata",
include_content=include_content,
included_files=included_files,
errors=errors,
)
db._lock_db()
return db_path
def create_optional_inputs(
self, source_type: str, target_type
) -> Union[Mapping[str, Mapping[str, Any]], None]:
inputs = {}
if source_type == "file":
inputs["first_row_is_header"] = {
"type": "boolean",
"optional": True,
"doc": "Whether the first row of the file is a header row. If not provided, kiara will try to auto-determine.",
}
if target_type == "database" and source_type == "table":
inputs["table_name"] = {
"type": "string",
"doc": "The name of the table in the new database.",
"default": "imported_table",
}
return inputs
def create__database__from__tables(
self, source_value: Value, optional: ValueMap
) -> Any:
"""Create a database value from a list of tables."""
from kiara_plugin.tabular.utils.tables import create_database_from_tables
tables: KiaraTables = source_value.data
db = create_database_from_tables(tables=tables)
return db
def create__database__from__table(
self, source_value: Value, optional: ValueMap
) -> Any:
"""Create a database value from a table."""
table_name = optional.get_value_data("table_name")
if not table_name:
table_name = DEFAULT_TABLE_NAME
table: KiaraTable = source_value.data
arrow_table = table.arrow_table
column_map = None
index_columns = None
sqlite_schema = create_sqlite_schema_data_from_arrow_table(
table=arrow_table, index_columns=index_columns, column_map=column_map
)
db = KiaraDatabase.create_in_temp_dir()
db._unlock_db()
engine = db.get_sqlalchemy_engine()
_table = sqlite_schema.create_table(table_name=table_name, engine=engine)
with engine.connect() as conn:
for batch in arrow_table.to_batches(
max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE
):
conn.execute(insert(_table), batch.to_pylist())
conn.commit()
db._lock_db()
return db