Module table.merge in plugin tabular v0.5.3
Create a table from other tables and/or arrays.
This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary number of tables/arrays, all tables to be merged must be specified in the module configuration.
Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the desired column name as key, and a field-name in the following format as value:
- '[inputs_schema key]' for inputs of type 'array'
- '[inputs_schema_key].orig_column_name' for inputs of type 'table'
Author(s) | Markus Binsteiner (markus@frkl.io) |
Tags | tabular |
Python class | kiara_plugin.tabular.modules.table.MergeTableModule |
Module configuration options
Configuration class: kiara_plugin.tabular.modules.table.MergeTableConfig
Name | Description | Type | Required? | Default |
---|---|---|---|---|
inputs_schema | A dict describing the inputs for this merge process. | object | true | null |
column_map | A map describing | object | false | null |
constants | Value constants for this module. | object | false | null |
defaults | Value defaults for this module. | object | false | null |
Module source code
class MergeTableModule(KiaraModule): """Create a table from other tables and/or arrays.
This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary number of tables/arrays, all tables to be merged must be specified in the module configuration.
Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the desired column name as key, and a field-name in the following format as value: - '[inputs_schema key]' for inputs of type 'array' - '[inputs_schema_key].orig_column_name' for inputs of type 'table' """
_module_type_name = "table.merge" _config_cls = MergeTableConfig
def create_inputs_schema( self, ) -> ValueMapSchema:
input_schema_models = self.get_config_value("inputs_schema")
input_schema_dict = {} for k, v in input_schema_models.items(): input_schema_dict[k] = v.model_dump()
return input_schema_dict
def create_outputs_schema( self, ) -> ValueMapSchema:
outputs = { "table": { "type": "table", "doc": "The merged table, including all source tables and columns.", } } return outputs
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:
import pyarrow as pa
inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema") column_map: Dict[str, str] = self.get_config_value("column_map")
sources = {} for field_name in inputs_schema.keys(): sources[field_name] = inputs.get_value_data(field_name)
len_dict = {} arrays = {}
column_map_final = dict(column_map)
for source_key, table_or_array in sources.items():
if isinstance(table_or_array, KiaraTable): rows = table_or_array.num_rows for name in table_or_array.column_names: array_name = f"{source_key}.{name}" if column_map and array_name not in column_map.values(): job_log.add_log( f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map." ) continue
column = table_or_array.arrow_table.column(name) arrays[array_name] = column if not column_map: if name in column_map_final: raise Exception( f"Can't merge table, duplicate column name: {name}." ) column_map_final[name] = array_name
elif isinstance(table_or_array, KiaraArray):
if column_map and source_key not in column_map.values(): job_log.add_log( f"Ignoring array '{source_key}': not listed in column_map." ) continue
rows = len(table_or_array) arrays[source_key] = table_or_array.arrow_array
if not column_map: if source_key in column_map_final.keys(): raise Exception( f"Can't merge table, duplicate column name: {source_key}." ) column_map_final[source_key] = source_key
else: raise KiaraProcessingException( f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'." )
len_dict[source_key] = rows
all_rows = None for source_key, rows in len_dict.items(): if all_rows is None: all_rows = rows elif all_rows != rows: all_rows = None break
if all_rows is None: len_str = "" for name, rows in len_dict.items(): len_str = f" {name} ({rows})"
raise KiaraProcessingException( f"Can't merge table, sources have different lengths: {len_str}" )
column_names = [] columns = [] for column_name, ref in column_map_final.items(): column_names.append(column_name) column = arrays[ref] columns.append(column)
table = pa.Table.from_arrays(arrays=columns, names=column_names)
outputs.set_value("table", table)