class AssembleTablesModule(KiaraModule):
"""Assemble a 'tables' value from multiple tables.
Depending on the module configuration, 2 or more tables can be merged into a single 'tables' value.
"""
_module_type_name = "assemble.tables"
_config_cls = AssembleTablesConfig
@functools.cached_property
def _table_details(self) -> Tuple[int, Union[List[str], None]]:
number_tables: Union[int, None] = self.get_config_value("number_of_tables")
table_names: Union[None, List[str]] = self.get_config_value("table_names")
if not table_names:
if not number_tables:
number_tables = 2
elif not number_tables:
number_tables = len(table_names)
elif not number_tables == len(table_names):
raise KiaraException(
"The 'number_of_tables' and length of 'table_names' config option must match."
)
if number_tables < 2:
raise KiaraException("The 'number_of_tables' must be at least 2.")
return number_tables, table_names
@property
def number_of_tables(self) -> int:
number_tables, _ = self._table_details
return number_tables
@property
def table_names(self) -> Union[List[str], None]:
_, table_names = self._table_details
return table_names
def create_inputs_schema(
self,
) -> ValueMapSchema:
number_tables = self.number_of_tables
table_names = self.table_names
if not table_names:
if not number_tables:
number_tables = 2
elif not number_tables:
number_tables = len(table_names)
elif not number_tables == len(table_names):
raise KiaraException(
"The 'number_of_tables' and length of 'table_names' config option must match."
)
if number_tables < 2:
raise KiaraException("The 'number_of_tables' must be at least 2.")
inputs_schema = {}
if not table_names:
for i in range(1, number_tables + 1):
inputs_schema[f"table_name_{i}"] = {
"type": "string",
"doc": f"The alias for table #{i}.",
}
inputs_schema[f"table_{i}"] = {
"type": "table",
"doc": f"The table to merge (#{i}).",
}
else:
for table_name in table_names:
inputs_schema[f"table_{table_name}"] = {
"type": "table",
"doc": f"The table to merge for alias '{table_name}'.",
}
return inputs_schema
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs = {
"tables": {
"type": "tables",
"doc": "The assembled tables instance.",
}
}
return outputs
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:
number_tables = self.number_of_tables
table_names = self.table_names
tables: Dict[str, Any] = {}
if not table_names:
for i in range(1, number_tables + 1):
table_name = inputs.get_value_data(f"table_name_{i}")
table = inputs.get_value_obj(f"table_{i}")
if table_name in tables.keys():
raise KiaraException(f"Duplicate table name: '{table_name}'")
tables[table_name] = table
else:
for table_name in table_names:
table = inputs.get_value_obj(f"table_{table_name}")
tables[table_name] = table
outputs.set_value("tables", tables)