Module table.merge in plugin tabular v0.5.1
Create a table from other tables and/or arrays.
This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary number of tables/arrays, all tables to be merged must be specified in the module configuration.
Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the desired column name as key, and a field-name in the following format as value:
- '[inputs_schema key]' for inputs of type 'array'
- '[inputs_schema_key].orig_column_name' for inputs of type 'table'
| Author(s) | Markus Binsteiner (markus@frkl.io) | 
| Tags | tabular | 
| Python class | kiara_plugin.tabular.modules.table.MergeTableModule | 
Module configuration options
Configuration class: kiara_plugin.tabular.modules.table.MergeTableConfig 
| Name | Description | Type | Required? | Default | 
|---|---|---|---|---|
| inputs_schema | A dict describing the inputs for this merge process. | object | true | null | 
| column_map | A map describing | object | false | null | 
| constants | Value constants for this module. | object | false | null | 
| defaults | Value defaults for this module. | object | false | null | 
Module source code
class MergeTableModule(KiaraModule):    """Create a table from other tables and/or arrays.
    This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary    number of tables/arrays, all tables to be merged must be specified in the module configuration.
    Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the    desired column name as key, and a field-name in the following format as value:    - '[inputs_schema key]' for inputs of type 'array'    - '[inputs_schema_key].orig_column_name' for inputs of type 'table'    """
    _module_type_name = "table.merge"    _config_cls = MergeTableConfig
    def create_inputs_schema(        self,    ) -> ValueMapSchema:
        input_schema_models = self.get_config_value("inputs_schema")
        input_schema_dict = {}        for k, v in input_schema_models.items():            input_schema_dict[k] = v.model_dump()
        return input_schema_dict
    def create_outputs_schema(        self,    ) -> ValueMapSchema:
        outputs = {            "table": {                "type": "table",                "doc": "The merged table, including all source tables and columns.",            }        }        return outputs
    def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:
        import pyarrow as pa
        inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")        column_map: Dict[str, str] = self.get_config_value("column_map")
        sources = {}        for field_name in inputs_schema.keys():            sources[field_name] = inputs.get_value_data(field_name)
        len_dict = {}        arrays = {}
        column_map_final = dict(column_map)
        for source_key, table_or_array in sources.items():
            if isinstance(table_or_array, KiaraTable):                rows = table_or_array.num_rows                for name in table_or_array.column_names:                    array_name = f"{source_key}.{name}"                    if column_map and array_name not in column_map.values():                        job_log.add_log(                            f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."                        )                        continue
                    column = table_or_array.arrow_table.column(name)                    arrays[array_name] = column                    if not column_map:                        if name in column_map_final:                            raise Exception(                                f"Can't merge table, duplicate column name: {name}."                            )                        column_map_final[name] = array_name
            elif isinstance(table_or_array, KiaraArray):
                if column_map and source_key not in column_map.values():                    job_log.add_log(                        f"Ignoring array '{source_key}': not listed in column_map."                    )                    continue
                rows = len(table_or_array)                arrays[source_key] = table_or_array.arrow_array
                if not column_map:                    if source_key in column_map_final.keys():                        raise Exception(                            f"Can't merge table, duplicate column name: {source_key}."                        )                    column_map_final[source_key] = source_key
            else:                raise KiaraProcessingException(                    f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."                )
            len_dict[source_key] = rows
        all_rows = None        for source_key, rows in len_dict.items():            if all_rows is None:                all_rows = rows            elif all_rows != rows:                all_rows = None                break
        if all_rows is None:            len_str = ""            for name, rows in len_dict.items():                len_str = f" {name} ({rows})"
            raise KiaraProcessingException(                f"Can't merge table, sources have different lengths: {len_str}"            )
        column_names = []        columns = []        for column_name, ref in column_map_final.items():            column_names.append(column_name)            column = arrays[ref]            columns.append(column)
        table = pa.Table.from_arrays(arrays=columns, names=column_names)
        outputs.set_value("table", table)