class MergeTableModule(KiaraModule):
"""Create a table from other tables and/or arrays.
This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary
number of tables/arrays, all tables to be merged must be specified in the module configuration.
Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the
desired column name as key, and a field-name in the following format as value:
- '[inputs_schema key]' for inputs of type 'array'
- '[inputs_schema_key].orig_column_name' for inputs of type 'table'
"""
_module_type_name = "table.merge"
_config_cls = MergeTableConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
input_schema_models = self.get_config_value("inputs_schema")
input_schema_dict = {}
for k, v in input_schema_models.items():
input_schema_dict[k] = v.model_dump()
return input_schema_dict
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs = {
"table": {
"type": "table",
"doc": "The merged table, including all source tables and columns.",
}
}
return outputs
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:
import pyarrow as pa
inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")
column_map: Dict[str, str] = self.get_config_value("column_map")
sources = {}
for field_name in inputs_schema.keys():
sources[field_name] = inputs.get_value_data(field_name)
len_dict = {}
arrays = {}
column_map_final = dict(column_map)
for source_key, table_or_array in sources.items():
if isinstance(table_or_array, KiaraTable):
rows = table_or_array.num_rows
for name in table_or_array.column_names:
array_name = f"{source_key}.{name}"
if column_map and array_name not in column_map.values():
job_log.add_log(
f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."
)
continue
column = table_or_array.arrow_table.column(name)
arrays[array_name] = column
if not column_map:
if name in column_map_final:
raise Exception(
f"Can't merge table, duplicate column name: {name}."
)
column_map_final[name] = array_name
elif isinstance(table_or_array, KiaraArray):
if column_map and source_key not in column_map.values():
job_log.add_log(
f"Ignoring array '{source_key}': not listed in column_map."
)
continue
rows = len(table_or_array)
arrays[source_key] = table_or_array.arrow_array
if not column_map:
if source_key in column_map_final.keys():
raise Exception(
f"Can't merge table, duplicate column name: {source_key}."
)
column_map_final[source_key] = source_key
else:
raise KiaraProcessingException(
f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."
)
len_dict[source_key] = rows
all_rows = None
for source_key, rows in len_dict.items():
if all_rows is None:
all_rows = rows
elif all_rows != rows:
all_rows = None
break
if all_rows is None:
len_str = ""
for name, rows in len_dict.items():
len_str = f" {name} ({rows})"
raise KiaraProcessingException(
f"Can't merge table, sources have different lengths: {len_str}"
)
column_names = []
columns = []
for column_name, ref in column_map_final.items():
column_names.append(column_name)
column = arrays[ref]
columns.append(column)
table = pa.Table.from_arrays(arrays=columns, names=column_names)
outputs.set_value("table", table)