class PickColumnModule(KiaraModule):
"""Pick one column from a table, returning an array."""
_module_type_name = "table.pick.column"
_config_cls = PickColumnModuleConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs: Dict[str, Any] = {"table": {"type": "table", "doc": "A table."}}
column_name = self.get_config_value("column_name")
if not column_name:
inputs["column_name"] = {
"type": "string",
"doc": "The name of the column to extract.",
}
return inputs
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs: Mapping[str, Any] = {"array": {"type": "array", "doc": "The column."}}
return outputs
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import pyarrow as pa
column_name: Union[str, None] = self.get_config_value("column_name")
if not column_name:
column_name = inputs.get_value_data("column_name")
if not column_name:
raise KiaraProcessingException(
"Could not cut column from table: column_name not provided or empty string."
)
table_value: Value = inputs.get_value_obj("table")
table_metadata: KiaraTableMetadata = table_value.get_property_data(
"metadata.table"
)
available = table_metadata.table.column_names
if column_name not in available:
raise KiaraProcessingException(
f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
)
table: pa.Table = table_value.data.arrow_table
column = table.column(column_name)
outputs.set_value("array", column)