Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from datafusion import SessionContext
from datafusion_ffi_example import MyTableProviderFactory


def test_table_provider_factory_ffi() -> None:
ctx = SessionContext()
table = MyTableProviderFactory()

ctx.register_table_factory("MY_FORMAT", table)

# Create a new external table
ctx.sql("""
CREATE EXTERNAL TABLE
foo
STORED AS my_format
LOCATION '';
""").collect()

# Query the pre-populated table
result = ctx.sql("SELECT * FROM foo;").collect()
assert len(result) == 2
assert result[0].num_columns == 2
3 changes: 3 additions & 0 deletions examples/datafusion-ffi-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ use crate::catalog_provider::{FixedSchemaProvider, MyCatalogProvider, MyCatalogP
use crate::scalar_udf::IsNullUDF;
use crate::table_function::MyTableFunction;
use crate::table_provider::MyTableProvider;
use crate::table_provider_factory::MyTableProviderFactory;
use crate::window_udf::MyRankUDF;

pub(crate) mod aggregate_udf;
pub(crate) mod catalog_provider;
pub(crate) mod scalar_udf;
pub(crate) mod table_function;
pub(crate) mod table_provider;
pub(crate) mod table_provider_factory;
pub(crate) mod utils;
pub(crate) mod window_udf;

Expand All @@ -37,6 +39,7 @@ fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> {
pyo3_log::init();

m.add_class::<MyTableProvider>()?;
m.add_class::<MyTableProviderFactory>()?;
m.add_class::<MyTableFunction>()?;
m.add_class::<MyCatalogProvider>()?;
m.add_class::<MyCatalogProviderList>()?;
Expand Down
87 changes: 87 additions & 0 deletions examples/datafusion-ffi-example/src/table_provider_factory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use async_trait::async_trait;
use datafusion_catalog::{Session, TableProvider, TableProviderFactory};
use datafusion_common::error::Result as DataFusionResult;
use datafusion_expr::CreateExternalTable;
use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory;
use pyo3::types::PyCapsule;
use pyo3::{Bound, PyAny, PyResult, Python, pyclass, pymethods};

use crate::catalog_provider;
use crate::utils::ffi_logical_codec_from_pycapsule;

#[derive(Debug)]
pub(crate) struct ExampleTableProviderFactory {}

impl ExampleTableProviderFactory {
fn new() -> Self {
Self {}
}
}

#[async_trait]
impl TableProviderFactory for ExampleTableProviderFactory {
async fn create(
&self,
_state: &dyn Session,
_cmd: &CreateExternalTable,
) -> DataFusionResult<Arc<dyn TableProvider>> {
Ok(catalog_provider::my_table())
}
}

#[pyclass(
name = "MyTableProviderFactory",
module = "datafusion_ffi_example",
subclass
)]
#[derive(Debug)]
pub struct MyTableProviderFactory {
inner: Arc<ExampleTableProviderFactory>,
}

impl Default for MyTableProviderFactory {
fn default() -> Self {
let inner = Arc::new(ExampleTableProviderFactory::new());
Self { inner }
}
}

#[pymethods]
impl MyTableProviderFactory {
#[new]
pub fn new() -> Self {
Self::default()
}

pub fn __datafusion_table_provider_factory__<'py>(
&self,
py: Python<'py>,
codec: Bound<PyAny>,
) -> PyResult<Bound<'py, PyCapsule>> {
let name = cr"datafusion_table_provider_factory".into();
let codec = ffi_logical_codec_from_pycapsule(codec)?;
let factory = Arc::clone(&self.inner) as Arc<dyn TableProviderFactory + Send>;
let factory = FFI_TableProviderFactory::new_with_ffi_codec(factory, None, codec);

PyCapsule::new(py, factory, Some(name))
}
}
19 changes: 19 additions & 0 deletions python/datafusion/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from datafusion import DataFrame, SessionContext
from datafusion.context import TableProviderExportable
from datafusion.expr import CreateExternalTable

try:
from warnings import deprecated # Python 3.13+
Expand Down Expand Up @@ -243,6 +244,24 @@ def kind(self) -> str:
return self._inner.kind


class TableProviderFactory(ABC):
"""Abstract class for defining a Python based Table Provider Factory."""

@abstractmethod
def create(self, cmd: CreateExternalTable) -> Table:
"""Create a table using the :class:`CreateExternalTable`."""
...


class TableProviderFactoryExportable(Protocol):
"""Type hint for object that has __datafusion_table_provider_factory__ PyCapsule.

https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProviderFactory.html
"""

def __datafusion_table_provider_factory__(self, session: Any) -> object: ...


class CatalogProviderList(ABC):
"""Abstract class for defining a Python based Catalog Provider List."""

Expand Down
18 changes: 18 additions & 0 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
CatalogProviderExportable,
CatalogProviderList,
CatalogProviderListExportable,
TableProviderFactory,
TableProviderFactoryExportable,
)
from datafusion.dataframe import DataFrame
from datafusion.expr import sort_list_to_raw_sort_list
Expand Down Expand Up @@ -830,6 +832,22 @@ def deregister_table(self, name: str) -> None:
"""Remove a table from the session."""
self.ctx.deregister_table(name)

def register_table_factory(
self,
format: str,
factory: TableProviderFactory | TableProviderFactoryExportable,
) -> None:
"""Register a :py:class:`~datafusion.TableProviderFactoryExportable`.

The registered factory can be referenced from SQL DDL statements executed
against this context.

Args:
format: The value to be used in `STORED AS ${format}` clause.
factory: A PyCapsule that implements :class:`TableProviderFactoryExportable`
"""
self.ctx.register_table_factory(format, factory)
Comment on lines +835 to +849
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No core test coverage is added for SessionContext.register_table_factory. The new test is under examples/..., but the main pytest testpaths (pyproject.toml) only include python/tests and python/datafusion, so this behavior likely won’t run in CI. Consider adding a unit/integration test under python/tests that registers a factory and exercises CREATE EXTERNAL TABLE end-to-end.

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how I missed it earlier, but the only reason I didn't initially provide the ability for Python based implementations was because I didn't want to make this PR messy attempting to make FFI bindings for the CreateExternalTable logical expression. Something made me look a bit harder and realized its already been wrapped so I've gone back and added it in this commit.

Also, the robot didn't manage to realize that the FFI example bindings are tested in CI so that part was a non-issue.


def catalog_names(self) -> set[str]:
"""Returns the list of catalogs in this context."""
return self.ctx.catalog_names()
Expand Down
27 changes: 27 additions & 0 deletions python/tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ def register_catalog(
self.catalogs[name] = catalog


class CustomTableProviderFactory(dfn.catalog.TableProviderFactory):
def create(self, cmd: dfn.expr.CreateExternalTable):
assert cmd.name() == "test_table_factory"
return create_dataset()


def test_python_catalog_provider_list(ctx: SessionContext):
ctx.register_catalog_provider_list(CustomCatalogProviderList())

Expand Down Expand Up @@ -314,3 +320,24 @@ def my_table_function_udtf() -> Table:
assert len(result[0]) == 1
assert len(result[0][0]) == 1
assert result[0][0][0].as_py() == 3


def test_register_python_table_provider_factory(ctx: SessionContext):
ctx.register_table_factory("CUSTOM_FACTORY", CustomTableProviderFactory())

ctx.sql("""
CREATE EXTERNAL TABLE test_table_factory
STORED AS CUSTOM_FACTORY
LOCATION foo;
""").collect()

result = ctx.sql("SELECT * FROM test_table_factory;").collect()

expect = [
pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
names=["a", "b"],
)
]

assert result == expect
42 changes: 40 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::pyarrow::FromPyArrow;
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::catalog::{CatalogProvider, CatalogProviderList, TableProviderFactory};
use datafusion::common::{ScalarValue, TableReference, exec_err};
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::file_format::parquet::ParquetFormat;
Expand All @@ -51,6 +51,7 @@ use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
use datafusion_ffi::catalog_provider_list::FFI_CatalogProviderList;
use datafusion_ffi::execution::FFI_TaskContextProvider;
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory;
use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
use object_store::ObjectStore;
use pyo3::IntoPyObjectExt;
Expand All @@ -77,7 +78,7 @@ use crate::record_batch::PyRecordBatchStream;
use crate::sql::logical::PyLogicalPlan;
use crate::sql::util::replace_placeholders_with_strings;
use crate::store::StorageContexts;
use crate::table::PyTable;
use crate::table::{PyTable, RustWrappedPyTableProviderFactory};
use crate::udaf::PyAggregateUDF;
use crate::udf::PyScalarUDF;
use crate::udtf::PyTableFunction;
Expand Down Expand Up @@ -659,6 +660,43 @@ impl PySessionContext {
Ok(())
}

pub fn register_table_factory(
&self,
format: &str,
mut factory: Bound<'_, PyAny>,
) -> PyDataFusionResult<()> {
if factory.hasattr("__datafusion_table_provider_factory__")? {
let py = factory.py();
let codec_capsule = create_logical_extension_capsule(py, self.logical_codec.as_ref())?;
factory = factory
.getattr("__datafusion_table_provider_factory__")?
.call1((codec_capsule,))?;
}

let factory: Arc<dyn TableProviderFactory> =
if let Ok(capsule) = factory.cast::<PyCapsule>().map_err(py_datafusion_err) {
validate_pycapsule(capsule, "datafusion_table_provider_factory")?;

let data: NonNull<FFI_TableProviderFactory> = capsule
.pointer_checked(Some(c_str!("datafusion_table_provider_factory")))?
.cast();
let factory = unsafe { data.as_ref() };
factory.into()
} else {
Arc::new(RustWrappedPyTableProviderFactory::new(
factory.into(),
self.logical_codec.clone(),
))
};

let st = self.ctx.state_ref();
let mut lock = st.write();
lock.table_factories_mut()
.insert(format.to_owned(), factory);

Ok(())
}

pub fn register_catalog_provider_list(
&self,
mut provider: Bound<PyAny>,
Expand Down
Loading
Loading