Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,16 @@ jobs:
- name: Start Docker containers
run: make docker-up

- name: Integration Test
- name: Rust Integration Test
run: cargo test -p paimon-integration-tests --all-targets
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full

- name: Go Integration Test
working-directory: bindings/go
run: make test

- name: Stop Docker containers
if: always()
run: make docker-down
25 changes: 24 additions & 1 deletion bindings/c/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,19 @@ pub unsafe extern "C" fn paimon_plan_free(plan: *mut paimon_plan) {
}
}

/// Return the number of data splits in a plan.
///
/// # Safety
/// `plan` must be a valid pointer from `paimon_table_scan_plan`, or null (returns 0).
#[no_mangle]
pub unsafe extern "C" fn paimon_plan_num_splits(plan: *const paimon_plan) -> usize {
if plan.is_null() {
return 0;
}
let plan_ref = &*((*plan).inner as *const Plan);
plan_ref.splits().len()
}

// ======================= TableRead ===============================

/// Free a paimon_table_read.
Expand All @@ -222,12 +235,18 @@ pub unsafe extern "C" fn paimon_table_read_free(read: *mut paimon_table_read) {
/// via `paimon_record_batch_reader_next`. This avoids loading all batches
/// into memory at once.
///
/// `offset` and `length` select a contiguous sub-range of splits from the
/// plan. The range is clamped to the available splits (out-of-range values
/// are silently adjusted).
///
/// # Safety
/// `read` and `plan` must be valid pointers from previous paimon C calls, or null (returns error).
#[no_mangle]
pub unsafe extern "C" fn paimon_table_read_to_arrow(
read: *const paimon_table_read,
plan: *const paimon_plan,
offset: usize,
length: usize,
) -> paimon_result_record_batch_reader {
if let Err(e) = check_non_null(read, "read") {
return paimon_result_record_batch_reader {
Expand All @@ -244,10 +263,14 @@ pub unsafe extern "C" fn paimon_table_read_to_arrow(

let table = &*((*read).inner as *const Table);
let plan_ref = &*((*plan).inner as *const Plan);
let all_splits = plan_ref.splits();
let start = offset.min(all_splits.len());
let end = (offset.saturating_add(length)).min(all_splits.len());
let selected = &all_splits[start..end];

let rb = table.new_read_builder();
match rb.new_read() {
Ok(table_read) => match table_read.to_arrow(plan_ref.splits()) {
Ok(table_read) => match table_read.to_arrow(selected) {
Ok(stream) => {
let reader = Box::new(stream);
let wrapper = Box::new(paimon_record_batch_reader {
Expand Down
61 changes: 61 additions & 0 deletions bindings/go/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

RUST_ROOT := $(shell cd ../.. && pwd)
TARGET_DIR := $(RUST_ROOT)/target
GO_DIR := $(shell pwd)

# Detect platform
UNAME_S := $(shell uname -s)
UNAME_M := $(shell uname -m)

ifeq ($(UNAME_S),Darwin)
LIB_EXT := dylib
OS := darwin
else
LIB_EXT := so
OS := linux
endif

ifeq ($(UNAME_M),arm64)
ARCH := arm64
else ifeq ($(UNAME_M),aarch64)
ARCH := arm64
else
ARCH := amd64
endif

LIB_NAME := libpaimon_c.$(OS).$(ARCH).$(LIB_EXT)

.PHONY: build test clean

# Build the Rust shared library, compress with zstd, and place in Go package dir.
build:
cd $(RUST_ROOT) && cargo build -p paimon-c --release
zstd -19 -f $(TARGET_DIR)/release/libpaimon_c.$(LIB_EXT) -o $(GO_DIR)/$(LIB_NAME).zst

# Run Go integration tests.
# Requires test data: run 'make docker-up' from the repo root first.
# CGO is needed by arrow-go's cdata package (imported by the main paimon package).
PAIMON_TEST_WAREHOUSE ?= /tmp/paimon-warehouse

test: build
cd $(GO_DIR)/tests && PAIMON_TEST_WAREHOUSE=$(PAIMON_TEST_WAREHOUSE) go test -v ./...

clean:
cd $(RUST_ROOT) && cargo clean
rm -f $(GO_DIR)/libpaimon_c.*.zst
175 changes: 175 additions & 0 deletions bindings/go/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package paimon

import (
"context"
"sync"
"unsafe"

"github.com/jupiterrider/ffi"
)

// Catalog wraps a paimon FileSystemCatalog.
type Catalog struct {
ctx context.Context
lib *libRef
inner *paimonCatalog
closeOnce sync.Once
}

// NewFileSystemCatalog creates a new FileSystemCatalog for the given warehouse path.
func NewFileSystemCatalog(warehouse string) (*Catalog, error) {
ctx, lib, err := ensureLoaded()
if err != nil {
return nil, err
}
createFn := ffiCatalogNew.symbol(ctx)
inner, err := createFn(warehouse)
if err != nil {
return nil, err
}
lib.acquire()
return &Catalog{ctx: ctx, lib: lib, inner: inner}, nil
}

// Close releases the catalog resources. Safe to call multiple times.
func (c *Catalog) Close() {
c.closeOnce.Do(func() {
ffiCatalogFree.symbol(c.ctx)(c.inner)
c.inner = nil
c.lib.release()
})
}

// GetTable retrieves a table from the catalog using the given identifier.
func (c *Catalog) GetTable(id Identifier) (*Table, error) {
if c.inner == nil {
return nil, ErrClosed
}
createIdFn := ffiIdentifierNew.symbol(c.ctx)
cID, err := createIdFn(id.database, id.object)
if err != nil {
return nil, err
}
defer ffiIdentifierFree.symbol(c.ctx)(cID)

getFn := ffiCatalogGetTable.symbol(c.ctx)
inner, err := getFn(c.inner, cID)
if err != nil {
return nil, err
}
c.lib.acquire()
return &Table{ctx: c.ctx, lib: c.lib, inner: inner}, nil
}

var ffiCatalogNew = newFFI(ffiOpts{
sym: "paimon_catalog_new",
rType: &typeResultCatalogNew,
aTypes: []*ffi.Type{&ffi.TypePointer},
}, func(ctx context.Context, ffiCall ffiCall) func(warehouse string) (*paimonCatalog, error) {
return func(warehouse string) (*paimonCatalog, error) {
byteWarehouse, err := bytePtrFromString(warehouse)
if err != nil {
return nil, err
}
var result resultCatalogNew
ffiCall(
unsafe.Pointer(&result),
unsafe.Pointer(&byteWarehouse),
)
if result.error != nil {
return nil, parseError(ctx, result.error)
}
return result.catalog, nil
}
})

var ffiCatalogFree = newFFI(ffiOpts{
sym: "paimon_catalog_free",
rType: &ffi.TypeVoid,
aTypes: []*ffi.Type{&ffi.TypePointer},
}, func(_ context.Context, ffiCall ffiCall) func(catalog *paimonCatalog) {
return func(catalog *paimonCatalog) {
ffiCall(
nil,
unsafe.Pointer(&catalog),
)
}
})

var ffiIdentifierNew = newFFI(ffiOpts{
sym: "paimon_identifier_new",
rType: &typeResultIdentifierNew,
aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
}, func(ctx context.Context, ffiCall ffiCall) func(database, object string) (*paimonIdentifier, error) {
return func(database, object string) (*paimonIdentifier, error) {
byteDB, err := bytePtrFromString(database)
if err != nil {
return nil, err
}
byteObj, err := bytePtrFromString(object)
if err != nil {
return nil, err
}
var result resultIdentifierNew
ffiCall(
unsafe.Pointer(&result),
unsafe.Pointer(&byteDB),
unsafe.Pointer(&byteObj),
)
if result.error != nil {
return nil, parseError(ctx, result.error)
}
return result.identifier, nil
}
})

var ffiIdentifierFree = newFFI(ffiOpts{
sym: "paimon_identifier_free",
rType: &ffi.TypeVoid,
aTypes: []*ffi.Type{&ffi.TypePointer},
}, func(_ context.Context, ffiCall ffiCall) func(id *paimonIdentifier) {
return func(id *paimonIdentifier) {
ffiCall(
nil,
unsafe.Pointer(&id),
)
}
})

var ffiCatalogGetTable = newFFI(ffiOpts{
sym: "paimon_catalog_get_table",
rType: &typeResultGetTable,
aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
}, func(ctx context.Context, ffiCall ffiCall) func(catalog *paimonCatalog, id *paimonIdentifier) (*paimonTable, error) {
return func(catalog *paimonCatalog, id *paimonIdentifier) (*paimonTable, error) {
var result resultGetTable
ffiCall(
unsafe.Pointer(&result),
unsafe.Pointer(&catalog),
unsafe.Pointer(&id),
)
if result.error != nil {
return nil, parseError(ctx, result.error)
}
return result.table, nil
}
})
31 changes: 31 additions & 0 deletions bindings/go/embed_darwin_amd64.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//go:build darwin && amd64

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package paimon

import _ "embed"

//go:embed libpaimon_c.darwin.amd64.dylib.zst
var libPaimonZst []byte

func tempFilePattern() string {
return "libpaimon_c*.dylib"
}
31 changes: 31 additions & 0 deletions bindings/go/embed_darwin_arm64.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//go:build darwin && arm64

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package paimon

import _ "embed"

//go:embed libpaimon_c.darwin.arm64.dylib.zst
var libPaimonZst []byte

func tempFilePattern() string {
return "libpaimon_c*.dylib"
}
Loading
Loading