diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 42128d6..73f47c9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -90,12 +90,16 @@ jobs: - name: Start Docker containers run: make docker-up - - name: Integration Test + - name: Rust Integration Test run: cargo test -p paimon-integration-tests --all-targets env: RUST_LOG: DEBUG RUST_BACKTRACE: full + - name: Go Integration Test + working-directory: bindings/go + run: make test + - name: Stop Docker containers if: always() run: make docker-down diff --git a/bindings/c/src/table.rs b/bindings/c/src/table.rs index 7f645c9..7892757 100644 --- a/bindings/c/src/table.rs +++ b/bindings/c/src/table.rs @@ -205,6 +205,19 @@ pub unsafe extern "C" fn paimon_plan_free(plan: *mut paimon_plan) { } } +/// Return the number of data splits in a plan. +/// +/// # Safety +/// `plan` must be a valid pointer from `paimon_table_scan_plan`, or null (returns 0). +#[no_mangle] +pub unsafe extern "C" fn paimon_plan_num_splits(plan: *const paimon_plan) -> usize { + if plan.is_null() { + return 0; + } + let plan_ref = &*((*plan).inner as *const Plan); + plan_ref.splits().len() +} + // ======================= TableRead =============================== /// Free a paimon_table_read. @@ -222,12 +235,18 @@ pub unsafe extern "C" fn paimon_table_read_free(read: *mut paimon_table_read) { /// via `paimon_record_batch_reader_next`. This avoids loading all batches /// into memory at once. /// +/// `offset` and `length` select a contiguous sub-range of splits from the +/// plan. The range is clamped to the available splits (out-of-range values +/// are silently adjusted). +/// /// # Safety /// `read` and `plan` must be valid pointers from previous paimon C calls, or null (returns error). #[no_mangle] pub unsafe extern "C" fn paimon_table_read_to_arrow( read: *const paimon_table_read, plan: *const paimon_plan, + offset: usize, + length: usize, ) -> paimon_result_record_batch_reader { if let Err(e) = check_non_null(read, "read") { return paimon_result_record_batch_reader { @@ -244,10 +263,14 @@ pub unsafe extern "C" fn paimon_table_read_to_arrow( let table = &*((*read).inner as *const Table); let plan_ref = &*((*plan).inner as *const Plan); + let all_splits = plan_ref.splits(); + let start = offset.min(all_splits.len()); + let end = (offset.saturating_add(length)).min(all_splits.len()); + let selected = &all_splits[start..end]; let rb = table.new_read_builder(); match rb.new_read() { - Ok(table_read) => match table_read.to_arrow(plan_ref.splits()) { + Ok(table_read) => match table_read.to_arrow(selected) { Ok(stream) => { let reader = Box::new(stream); let wrapper = Box::new(paimon_record_batch_reader { diff --git a/bindings/go/Makefile b/bindings/go/Makefile new file mode 100644 index 0000000..82c2b11 --- /dev/null +++ b/bindings/go/Makefile @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +RUST_ROOT := $(shell cd ../.. && pwd) +TARGET_DIR := $(RUST_ROOT)/target +GO_DIR := $(shell pwd) + +# Detect platform +UNAME_S := $(shell uname -s) +UNAME_M := $(shell uname -m) + +ifeq ($(UNAME_S),Darwin) + LIB_EXT := dylib + OS := darwin +else + LIB_EXT := so + OS := linux +endif + +ifeq ($(UNAME_M),arm64) + ARCH := arm64 +else ifeq ($(UNAME_M),aarch64) + ARCH := arm64 +else + ARCH := amd64 +endif + +LIB_NAME := libpaimon_c.$(OS).$(ARCH).$(LIB_EXT) + +.PHONY: build test clean + +# Build the Rust shared library, compress with zstd, and place in Go package dir. +build: + cd $(RUST_ROOT) && cargo build -p paimon-c --release + zstd -19 -f $(TARGET_DIR)/release/libpaimon_c.$(LIB_EXT) -o $(GO_DIR)/$(LIB_NAME).zst + +# Run Go integration tests. +# Requires test data: run 'make docker-up' from the repo root first. +# CGO is needed by arrow-go's cdata package (imported by the main paimon package). +PAIMON_TEST_WAREHOUSE ?= /tmp/paimon-warehouse + +test: build + cd $(GO_DIR)/tests && PAIMON_TEST_WAREHOUSE=$(PAIMON_TEST_WAREHOUSE) go test -v ./... + +clean: + cd $(RUST_ROOT) && cargo clean + rm -f $(GO_DIR)/libpaimon_c.*.zst diff --git a/bindings/go/catalog.go b/bindings/go/catalog.go new file mode 100644 index 0000000..b82035d --- /dev/null +++ b/bindings/go/catalog.go @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import ( + "context" + "sync" + "unsafe" + + "github.com/jupiterrider/ffi" +) + +// Catalog wraps a paimon FileSystemCatalog. +type Catalog struct { + ctx context.Context + lib *libRef + inner *paimonCatalog + closeOnce sync.Once +} + +// NewFileSystemCatalog creates a new FileSystemCatalog for the given warehouse path. +func NewFileSystemCatalog(warehouse string) (*Catalog, error) { + ctx, lib, err := ensureLoaded() + if err != nil { + return nil, err + } + createFn := ffiCatalogNew.symbol(ctx) + inner, err := createFn(warehouse) + if err != nil { + return nil, err + } + lib.acquire() + return &Catalog{ctx: ctx, lib: lib, inner: inner}, nil +} + +// Close releases the catalog resources. Safe to call multiple times. +func (c *Catalog) Close() { + c.closeOnce.Do(func() { + ffiCatalogFree.symbol(c.ctx)(c.inner) + c.inner = nil + c.lib.release() + }) +} + +// GetTable retrieves a table from the catalog using the given identifier. +func (c *Catalog) GetTable(id Identifier) (*Table, error) { + if c.inner == nil { + return nil, ErrClosed + } + createIdFn := ffiIdentifierNew.symbol(c.ctx) + cID, err := createIdFn(id.database, id.object) + if err != nil { + return nil, err + } + defer ffiIdentifierFree.symbol(c.ctx)(cID) + + getFn := ffiCatalogGetTable.symbol(c.ctx) + inner, err := getFn(c.inner, cID) + if err != nil { + return nil, err + } + c.lib.acquire() + return &Table{ctx: c.ctx, lib: c.lib, inner: inner}, nil +} + +var ffiCatalogNew = newFFI(ffiOpts{ + sym: "paimon_catalog_new", + rType: &typeResultCatalogNew, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(warehouse string) (*paimonCatalog, error) { + return func(warehouse string) (*paimonCatalog, error) { + byteWarehouse, err := bytePtrFromString(warehouse) + if err != nil { + return nil, err + } + var result resultCatalogNew + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&byteWarehouse), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.catalog, nil + } +}) + +var ffiCatalogFree = newFFI(ffiOpts{ + sym: "paimon_catalog_free", + rType: &ffi.TypeVoid, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(catalog *paimonCatalog) { + return func(catalog *paimonCatalog) { + ffiCall( + nil, + unsafe.Pointer(&catalog), + ) + } +}) + +var ffiIdentifierNew = newFFI(ffiOpts{ + sym: "paimon_identifier_new", + rType: &typeResultIdentifierNew, + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(database, object string) (*paimonIdentifier, error) { + return func(database, object string) (*paimonIdentifier, error) { + byteDB, err := bytePtrFromString(database) + if err != nil { + return nil, err + } + byteObj, err := bytePtrFromString(object) + if err != nil { + return nil, err + } + var result resultIdentifierNew + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&byteDB), + unsafe.Pointer(&byteObj), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.identifier, nil + } +}) + +var ffiIdentifierFree = newFFI(ffiOpts{ + sym: "paimon_identifier_free", + rType: &ffi.TypeVoid, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(id *paimonIdentifier) { + return func(id *paimonIdentifier) { + ffiCall( + nil, + unsafe.Pointer(&id), + ) + } +}) + +var ffiCatalogGetTable = newFFI(ffiOpts{ + sym: "paimon_catalog_get_table", + rType: &typeResultGetTable, + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(catalog *paimonCatalog, id *paimonIdentifier) (*paimonTable, error) { + return func(catalog *paimonCatalog, id *paimonIdentifier) (*paimonTable, error) { + var result resultGetTable + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&catalog), + unsafe.Pointer(&id), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.table, nil + } +}) diff --git a/bindings/go/embed_darwin_amd64.go b/bindings/go/embed_darwin_amd64.go new file mode 100644 index 0000000..304816e --- /dev/null +++ b/bindings/go/embed_darwin_amd64.go @@ -0,0 +1,31 @@ +//go:build darwin && amd64 + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import _ "embed" + +//go:embed libpaimon_c.darwin.amd64.dylib.zst +var libPaimonZst []byte + +func tempFilePattern() string { + return "libpaimon_c*.dylib" +} diff --git a/bindings/go/embed_darwin_arm64.go b/bindings/go/embed_darwin_arm64.go new file mode 100644 index 0000000..304b3a0 --- /dev/null +++ b/bindings/go/embed_darwin_arm64.go @@ -0,0 +1,31 @@ +//go:build darwin && arm64 + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import _ "embed" + +//go:embed libpaimon_c.darwin.arm64.dylib.zst +var libPaimonZst []byte + +func tempFilePattern() string { + return "libpaimon_c*.dylib" +} diff --git a/bindings/go/embed_linux_amd64.go b/bindings/go/embed_linux_amd64.go new file mode 100644 index 0000000..1f07c3f --- /dev/null +++ b/bindings/go/embed_linux_amd64.go @@ -0,0 +1,31 @@ +//go:build linux && amd64 + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import _ "embed" + +//go:embed libpaimon_c.linux.amd64.so.zst +var libPaimonZst []byte + +func tempFilePattern() string { + return "libpaimon_c*.so" +} diff --git a/bindings/go/embed_linux_arm64.go b/bindings/go/embed_linux_arm64.go new file mode 100644 index 0000000..16bcc99 --- /dev/null +++ b/bindings/go/embed_linux_arm64.go @@ -0,0 +1,31 @@ +//go:build linux && arm64 + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import _ "embed" + +//go:embed libpaimon_c.linux.arm64.so.zst +var libPaimonZst []byte + +func tempFilePattern() string { + return "libpaimon_c*.so" +} diff --git a/bindings/go/error.go b/bindings/go/error.go new file mode 100644 index 0000000..feb6ee9 --- /dev/null +++ b/bindings/go/error.go @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import ( + "context" + "errors" + "fmt" + "unsafe" + + "github.com/jupiterrider/ffi" +) + +// ErrClosed is returned when an operation is attempted on a closed resource. +var ErrClosed = errors.New("paimon: use of closed resource") + +// ErrorCode represents categories of errors from paimon. +type ErrorCode int32 + +const ( + CodeUnexpected ErrorCode = 0 + CodeUnsupported ErrorCode = 1 + CodeNotFound ErrorCode = 2 + CodeAlreadyExist ErrorCode = 3 + CodeInvalidInput ErrorCode = 4 + CodeIoError ErrorCode = 5 +) + +func parseError(ctx context.Context, err *paimonError) error { + if err == nil { + return nil + } + defer ffiErrorFree.symbol(ctx)(err) + return &Error{ + code: ErrorCode(err.code), + message: string(parseBytes(err.message)), + } +} + +// Error represents a paimon error with code and message. +type Error struct { + code ErrorCode + message string +} + +func (e *Error) Error() string { + return fmt.Sprintf("paimon error(%d): %s", e.code, e.message) +} + +// Code returns the error code. +func (e *Error) Code() ErrorCode { + return e.code +} + +// Message returns the error message. +func (e *Error) Message() string { + return e.message +} + +var ffiErrorFree = newFFI(ffiOpts{ + sym: "paimon_error_free", + rType: &ffi.TypeVoid, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(e *paimonError) { + return func(e *paimonError) { + ffiCall( + nil, + unsafe.Pointer(&e), + ) + } +}) diff --git a/bindings/go/ffi.go b/bindings/go/ffi.go new file mode 100644 index 0000000..cab2b54 --- /dev/null +++ b/bindings/go/ffi.go @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import ( + "context" + "errors" + "os" + "sync/atomic" + "unsafe" + + "github.com/jupiterrider/ffi" +) + +// libRef is an atomic reference counter for the loaded shared library. +// Every object that may call FFI (including during Close/Release) must +// hold a reference. The library is freed only when the count drops to zero. +type libRef struct { + count atomic.Int32 + lib uintptr +} + +func newLibRef(lib uintptr) *libRef { + r := &libRef{lib: lib} + r.count.Store(1) + return r +} + +func (r *libRef) acquire() { r.count.Add(1) } + +func (r *libRef) release() { + if r.count.Add(-1) == 0 { + _ = freeLibrary(r.lib) + } +} + +type ffiOpts struct { + sym contextKey + rType *ffi.Type + aTypes []*ffi.Type +} + +type ffiCall func(rValue unsafe.Pointer, aValues ...unsafe.Pointer) + +type contextKey string + +func (c contextKey) String() string { + return string(c) +} + +type contextWithFFI func(ctx context.Context, lib uintptr) (context.Context, error) + +// FFI is a generic type-safe wrapper for a foreign function. +type FFI[T any] struct { + opts ffiOpts + withFunc func(ctx context.Context, ffiCall ffiCall) T +} + +func newFFI[T any](opts ffiOpts, withFunc func(ctx context.Context, ffiCall ffiCall) T) *FFI[T] { + f := &FFI[T]{ + opts: opts, + withFunc: withFunc, + } + withFFIs = append(withFFIs, f.withFFI) + return f +} + +func (f *FFI[T]) symbol(ctx context.Context) T { + return ctx.Value(f.opts.sym).(T) +} + +func (f *FFI[T]) withFFI(ctx context.Context, lib uintptr) (context.Context, error) { + var cif ffi.Cif + if status := ffi.PrepCif( + &cif, + ffi.DefaultAbi, + uint32(len(f.opts.aTypes)), + f.opts.rType, + f.opts.aTypes..., + ); status != ffi.OK { + return nil, errors.New(status.String()) + } + fn, err := getProcAddress(lib, f.opts.sym.String()) + if err != nil { + return nil, err + } + val := f.withFunc(ctx, func(rValue unsafe.Pointer, aValues ...unsafe.Pointer) { + ffi.Call(&cif, fn, rValue, aValues...) + }) + return context.WithValue(ctx, f.opts.sym, val), nil +} + +var withFFIs []contextWithFFI + +func newContext(path string) (ctx context.Context, lib *libRef, err error) { + handle, err := loadLibrary(path) + if err != nil { + return + } + ctx = context.Background() + for _, withFFI := range withFFIs { + ctx, err = withFFI(ctx, handle) + if err != nil { + _ = freeLibrary(handle) + return + } + } + if removeErr := os.Remove(path); removeErr != nil && !errors.Is(removeErr, os.ErrNotExist) { + _ = freeLibrary(handle) + err = removeErr + return + } + lib = newLibRef(handle) + return +} diff --git a/bindings/go/go.mod b/bindings/go/go.mod new file mode 100644 index 0000000..31cc85a --- /dev/null +++ b/bindings/go/go.mod @@ -0,0 +1,24 @@ +module github.com/apache/paimon-rust/bindings/go + +go 1.22.4 + +require ( + github.com/apache/arrow-go/v18 v18.0.0 + github.com/ebitengine/purego v0.10.0 + github.com/jupiterrider/ffi v0.6.0 + github.com/klauspost/compress v1.17.11 + golang.org/x/sys v0.26.0 +) + +require ( + github.com/goccy/go-json v0.10.3 // indirect + github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect + golang.org/x/mod v0.21.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/tools v0.26.0 // indirect + golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect +) diff --git a/bindings/go/go.sum b/bindings/go/go.sum new file mode 100644 index 0000000..e605378 --- /dev/null +++ b/bindings/go/go.sum @@ -0,0 +1,57 @@ +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= +github.com/apache/arrow-go/v18 v18.0.0 h1:1dBDaSbH3LtulTyOVYaBCHO3yVRwjV+TZaqn3g6V7ZM= +github.com/apache/arrow-go/v18 v18.0.0/go.mod h1:t6+cWRSmKgdQ6HsxisQjok+jBpKGhRDiqcf3p0p/F+A= +github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= +github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/ebitengine/purego v0.10.0 h1:QIw4xfpWT6GWTzaW5XEKy3HXoqrJGx1ijYHzTF0/ISU= +github.com/ebitengine/purego v0.10.0/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= +github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jupiterrider/ffi v0.6.0 h1:UX378KcZvH5c8qgLi9KL/bL82SZTHdRspZ+jj7bvBng= +github.com/jupiterrider/ffi v0.6.0/go.mod h1:PqZ5Go6X9by8CIXgfprxfMPYmn8oT5m2O7AA56s64bY= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= +golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= +golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= +golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0= +gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/bindings/go/identifier.go b/bindings/go/identifier.go new file mode 100644 index 0000000..3ed647d --- /dev/null +++ b/bindings/go/identifier.go @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +// Identifier identifies a table by database and object name. +type Identifier struct { + database string + object string +} + +// NewIdentifier creates a new Identifier with the given database and object name. +func NewIdentifier(database, object string) Identifier { + return Identifier{database: database, object: object} +} diff --git a/bindings/go/lib.go b/bindings/go/lib.go new file mode 100644 index 0000000..57e6389 --- /dev/null +++ b/bindings/go/lib.go @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import ( + "bytes" + "fmt" + "io" + "os" + "sync" + + "github.com/klauspost/compress/zstd" +) + +var ( + libOnce sync.Once + libPath string + libErr error +) + +// loadEmbeddedLib decompresses the embedded shared library and writes it +// to a temp file. Called once via sync.Once. +func loadEmbeddedLib() error { + libOnce.Do(func() { + data, err := decompressLib(libPaimonZst) + if err != nil { + libErr = fmt.Errorf("paimon: failed to decompress embedded library: %w", err) + return + } + libPath, err = writeTempExec(tempFilePattern(), data) + if err != nil { + libErr = fmt.Errorf("paimon: failed to write temp library: %w", err) + return + } + }) + return libErr +} + +func decompressLib(raw []byte) ([]byte, error) { + decoder, err := zstd.NewReader(bytes.NewReader(raw)) + if err != nil { + return nil, err + } + defer decoder.Close() + return io.ReadAll(decoder) +} + +func writeTempExec(pattern string, binary []byte) (path string, err error) { + f, err := os.CreateTemp("", pattern) + if err != nil { + return "", err + } + defer f.Close() + defer func() { + if err != nil { + os.Remove(f.Name()) + } + }() + if _, err = f.Write(binary); err != nil { + return "", err + } + if err = f.Chmod(0o700); err != nil { + return "", err + } + return f.Name(), nil +} diff --git a/bindings/go/paimon.go b/bindings/go/paimon.go new file mode 100644 index 0000000..ac9ac27 --- /dev/null +++ b/bindings/go/paimon.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Package paimon provides a Go binding for Apache Paimon Rust. +// +// This binding uses purego and libffi to call into the paimon-c shared library. +// The pre-built shared library is embedded in the package and automatically +// loaded at runtime — no manual build step needed. +// +// This package requires CGO because it imports the arrow-go cdata package +// for Arrow C Data Interface support. +// +// Basic usage: +// +// catalog, err := paimon.NewFileSystemCatalog("/path/to/warehouse") +// if err != nil { log.Fatal(err) } +// defer catalog.Close() +// +// table, err := catalog.GetTable(paimon.NewIdentifier("default", "my_table")) +// if err != nil { log.Fatal(err) } +// defer table.Close() +package paimon + +import ( + "context" + "sync" +) + +var ( + globalOnce sync.Once + globalCtx context.Context + globalLib *libRef + globalErr error +) + +func ensureLoaded() (context.Context, *libRef, error) { + globalOnce.Do(func() { + if err := loadEmbeddedLib(); err != nil { + globalErr = err + return + } + globalCtx, globalLib, globalErr = newContext(libPath) + }) + return globalCtx, globalLib, globalErr +} diff --git a/bindings/go/plan.go b/bindings/go/plan.go new file mode 100644 index 0000000..218c631 --- /dev/null +++ b/bindings/go/plan.go @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + "unsafe" + + "github.com/jupiterrider/ffi" +) + +// Plan holds the result of a table scan, containing data splits to read. +type Plan struct { + handle *planHandle + closeOnce sync.Once +} + +// Close releases the plan resources. Safe to call multiple times. +// DataSplits obtained from Splits() remain valid after Close. +func (p *Plan) Close() { + p.closeOnce.Do(func() { + p.handle.release() + p.handle = nil + }) +} + +// NumSplits returns the number of data splits in this plan. +func (p *Plan) NumSplits() int { + if p.handle == nil { + panic("paimon: NumSplits called on closed Plan") + } + return ffiPlanNumSplits.symbol(p.handle.ctx)(p.handle.inner) +} + +// Splits returns all data splits in this plan. The returned DataSplits +// keep the underlying plan data alive via GC-attached reference counting, +// so they remain valid even after Plan.Close() is called. +func (p *Plan) Splits() []DataSplit { + if p.handle == nil { + panic("paimon: Splits called on closed Plan") + } + n := p.NumSplits() + set := newSplitSet(p.handle) + splits := make([]DataSplit, n) + for i := 0; i < n; i++ { + splits[i] = DataSplit{set: set, index: i} + } + return splits +} + +// planHandle wraps the C plan pointer with reference counting. +// The C plan is freed when the last reference is released. +type planHandle struct { + ctx context.Context + lib *libRef + inner *paimonPlan + refs atomic.Int32 +} + +func newPlanHandle(ctx context.Context, lib *libRef, inner *paimonPlan) *planHandle { + h := &planHandle{ctx: ctx, lib: lib, inner: inner} + h.refs.Store(1) // initial ref for the creator + return h +} + +func (h *planHandle) acquire() { h.refs.Add(1) } + +func (h *planHandle) release() { + if h.refs.Add(-1) == 0 { + ffiPlanFree.symbol(h.ctx)(h.inner) + h.lib.release() + } +} + +// splitSet ties DataSplit values to a planHandle via a GC finalizer. +// When all DataSplits (and the slice backing them) become unreachable, +// the GC collects the splitSet and its finalizer releases the planHandle ref. +type splitSet struct { + handle *planHandle +} + +func newSplitSet(h *planHandle) *splitSet { + h.acquire() + s := &splitSet{handle: h} + runtime.SetFinalizer(s, (*splitSet).release) + return s +} + +func (s *splitSet) release() { + runtime.SetFinalizer(s, nil) + s.handle.release() +} + +// DataSplit identifies a single data split within a plan. +// DataSplits keep the underlying plan data alive via GC-attached +// reference counting, so they are safe to use independently. +type DataSplit struct { + set *splitSet + index int +} + +var ffiPlanFree = newFFI(ffiOpts{ + sym: "paimon_plan_free", + rType: &ffi.TypeVoid, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(plan *paimonPlan) { + return func(plan *paimonPlan) { + ffiCall( + nil, + unsafe.Pointer(&plan), + ) + } +}) + +var ffiPlanNumSplits = newFFI(ffiOpts{ + sym: "paimon_plan_num_splits", + rType: &ffi.TypePointer, // usize == pointer-sized on 64-bit + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(plan *paimonPlan) int { + return func(plan *paimonPlan) int { + var count uintptr + ffiCall( + unsafe.Pointer(&count), + unsafe.Pointer(&plan), + ) + return int(count) + } +}) diff --git a/bindings/go/read_builder.go b/bindings/go/read_builder.go new file mode 100644 index 0000000..20b7a0d --- /dev/null +++ b/bindings/go/read_builder.go @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import ( + "context" + "sync" + "unsafe" + + "github.com/jupiterrider/ffi" +) + +// ReadBuilder creates TableScan and TableRead instances. +type ReadBuilder struct { + ctx context.Context + lib *libRef + inner *paimonReadBuilder + closeOnce sync.Once +} + +// Close releases the read builder resources. Safe to call multiple times. +func (rb *ReadBuilder) Close() { + rb.closeOnce.Do(func() { + ffiReadBuilderFree.symbol(rb.ctx)(rb.inner) + rb.inner = nil + rb.lib.release() + }) +} + +// NewScan creates a TableScan for planning which data files to read. +func (rb *ReadBuilder) NewScan() (*TableScan, error) { + if rb.inner == nil { + return nil, ErrClosed + } + createFn := ffiReadBuilderNewScan.symbol(rb.ctx) + inner, err := createFn(rb.inner) + if err != nil { + return nil, err + } + rb.lib.acquire() + return &TableScan{ctx: rb.ctx, lib: rb.lib, inner: inner}, nil +} + +// NewRead creates a TableRead for reading data from splits. +func (rb *ReadBuilder) NewRead() (*TableRead, error) { + if rb.inner == nil { + return nil, ErrClosed + } + createFn := ffiReadBuilderNewRead.symbol(rb.ctx) + inner, err := createFn(rb.inner) + if err != nil { + return nil, err + } + rb.lib.acquire() + return &TableRead{ctx: rb.ctx, lib: rb.lib, inner: inner}, nil +} + +var ffiReadBuilderFree = newFFI(ffiOpts{ + sym: "paimon_read_builder_free", + rType: &ffi.TypeVoid, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(rb *paimonReadBuilder) { + return func(rb *paimonReadBuilder) { + ffiCall( + nil, + unsafe.Pointer(&rb), + ) + } +}) + +var ffiReadBuilderNewScan = newFFI(ffiOpts{ + sym: "paimon_read_builder_new_scan", + rType: &typeResultTableScan, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(rb *paimonReadBuilder) (*paimonTableScan, error) { + return func(rb *paimonReadBuilder) (*paimonTableScan, error) { + var result resultTableScan + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&rb), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.scan, nil + } +}) + +var ffiReadBuilderNewRead = newFFI(ffiOpts{ + sym: "paimon_read_builder_new_read", + rType: &typeResultNewRead, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(rb *paimonReadBuilder) (*paimonTableRead, error) { + return func(rb *paimonReadBuilder) (*paimonTableRead, error) { + var result resultNewRead + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&rb), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.read, nil + } +}) diff --git a/bindings/go/table.go b/bindings/go/table.go new file mode 100644 index 0000000..17ff7af --- /dev/null +++ b/bindings/go/table.go @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import ( + "context" + "sync" + "unsafe" + + "github.com/jupiterrider/ffi" +) + +// Table represents a paimon table. +type Table struct { + ctx context.Context + lib *libRef + inner *paimonTable + closeOnce sync.Once +} + +// Close releases the table resources. Safe to call multiple times. +func (t *Table) Close() { + t.closeOnce.Do(func() { + ffiTableFree.symbol(t.ctx)(t.inner) + t.inner = nil + t.lib.release() + }) +} + +// NewReadBuilder creates a ReadBuilder for this table. +func (t *Table) NewReadBuilder() (*ReadBuilder, error) { + if t.inner == nil { + return nil, ErrClosed + } + createFn := ffiTableNewReadBuilder.symbol(t.ctx) + inner, err := createFn(t.inner) + if err != nil { + return nil, err + } + t.lib.acquire() + return &ReadBuilder{ctx: t.ctx, lib: t.lib, inner: inner}, nil +} + +var ffiTableFree = newFFI(ffiOpts{ + sym: "paimon_table_free", + rType: &ffi.TypeVoid, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(table *paimonTable) { + return func(table *paimonTable) { + ffiCall( + nil, + unsafe.Pointer(&table), + ) + } +}) + +var ffiTableNewReadBuilder = newFFI(ffiOpts{ + sym: "paimon_table_new_read_builder", + rType: &typeResultReadBuilder, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(table *paimonTable) (*paimonReadBuilder, error) { + return func(table *paimonTable) (*paimonReadBuilder, error) { + var result resultReadBuilder + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&table), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.readBuilder, nil + } +}) diff --git a/bindings/go/table_read.go b/bindings/go/table_read.go new file mode 100644 index 0000000..111eaec --- /dev/null +++ b/bindings/go/table_read.go @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import ( + "context" + "errors" + "io" + "runtime" + "sort" + "sync" + "unsafe" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/cdata" + "github.com/jupiterrider/ffi" +) + +// TableRead reads data from splits produced by a TableScan. +type TableRead struct { + ctx context.Context + lib *libRef + inner *paimonTableRead + closeOnce sync.Once +} + +// Close releases the table read resources. Safe to call multiple times. +func (tr *TableRead) Close() { + tr.closeOnce.Do(func() { + ffiTableReadFree.symbol(tr.ctx)(tr.inner) + tr.inner = nil + tr.lib.release() + }) +} + +// NewRecordBatchReader creates a RecordBatchReader that iterates over Arrow +// record batches for the given data splits. The splits can be non-contiguous +// and in any order. All splits must originate from the same Plan. +func (tr *TableRead) NewRecordBatchReader(splits []DataSplit) (*RecordBatchReader, error) { + if tr.inner == nil { + return nil, ErrClosed + } + if len(splits) == 0 { + return nil, errors.New("paimon: splits must not be empty") + } + + // All splits must share the same plan handle. + handle := splits[0].set.handle + indices := make([]int, len(splits)) + for i, s := range splits { + if s.set.handle != handle { + return nil, errors.New("paimon: all splits must originate from the same Plan") + } + indices[i] = s.index + } + sort.Ints(indices) + + // Group sorted indices into contiguous ranges. + type spanRange struct{ offset, length int } + ranges := []spanRange{{offset: indices[0], length: 1}} + for _, idx := range indices[1:] { + last := &ranges[len(ranges)-1] + if idx == last.offset+last.length { + last.length++ + } else { + ranges = append(ranges, spanRange{offset: idx, length: 1}) + } + } + + // Create one C reader per contiguous range. + createFn := ffiTableReadToArrow.symbol(tr.ctx) + readers := make([]*paimonRecordBatchReader, 0, len(ranges)) + for _, r := range ranges { + inner, err := createFn(tr.inner, handle.inner, uintptr(r.offset), uintptr(r.length)) + if err != nil { + // Free already-created readers on error. + freeFn := ffiRecordBatchReaderFree.symbol(tr.ctx) + for _, rd := range readers { + freeFn(rd) + tr.lib.release() + } + return nil, err + } + tr.lib.acquire() + readers = append(readers, inner) + } + + return &RecordBatchReader{ctx: tr.ctx, lib: tr.lib, readers: readers}, nil +} + +// RecordBatchReader iterates over Arrow record batches one at a time via +// the Arrow C Data Interface (zero-copy). Call NextRecord to advance and +// Close when done. +type RecordBatchReader struct { + ctx context.Context + lib *libRef + readers []*paimonRecordBatchReader + current int + closeOnce sync.Once +} + +// NextRecord returns the next Arrow record, or io.EOF when iteration is +// complete. The underlying C batch is imported via the Arrow C Data Interface +// and released automatically — the caller only needs to call Release on the +// returned arrow.Record when done. +func (r *RecordBatchReader) NextRecord() (arrow.Record, error) { + if r.readers == nil { + return nil, ErrClosed + } + batch, err := r.next() + if err != nil { + return nil, err + } + record, err := cdata.ImportCRecordBatch( + (*cdata.CArrowArray)(batch.array), + (*cdata.CArrowSchema)(batch.schema), + ) + batch.release() + if err != nil { + return nil, err + } + return record, nil +} + +func (r *RecordBatchReader) next() (*arrowBatch, error) { + nextFn := ffiRecordBatchReaderNext.symbol(r.ctx) + for r.current < len(r.readers) { + array, schema, err := nextFn(r.readers[r.current]) + if err != nil { + return nil, err + } + if array == nil && schema == nil { + r.current++ + continue + } + r.lib.acquire() + ab := &arrowBatch{ctx: r.ctx, lib: r.lib, array: array, schema: schema} + runtime.SetFinalizer(ab, (*arrowBatch).release) + return ab, nil + } + return nil, io.EOF +} + +// Close releases the underlying C record batch readers. Safe to call multiple times. +func (r *RecordBatchReader) Close() { + r.closeOnce.Do(func() { + freeFn := ffiRecordBatchReaderFree.symbol(r.ctx) + for _, rd := range r.readers { + freeFn(rd) + r.lib.release() + } + r.readers = nil + }) +} + +var ffiTableReadFree = newFFI(ffiOpts{ + sym: "paimon_table_read_free", + rType: &ffi.TypeVoid, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(read *paimonTableRead) { + return func(read *paimonTableRead) { + ffiCall( + nil, + unsafe.Pointer(&read), + ) + } +}) + +var ffiTableReadToArrow = newFFI(ffiOpts{ + sym: "paimon_table_read_to_arrow", + rType: &typeResultRecordBatchReader, + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer, &ffi.TypePointer, &ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(read *paimonTableRead, plan *paimonPlan, offset uintptr, length uintptr) (*paimonRecordBatchReader, error) { + return func(read *paimonTableRead, plan *paimonPlan, offset uintptr, length uintptr) (*paimonRecordBatchReader, error) { + var result resultRecordBatchReader + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&read), + unsafe.Pointer(&plan), + unsafe.Pointer(&offset), + unsafe.Pointer(&length), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.reader, nil + } +}) + +var ffiRecordBatchReaderNext = newFFI(ffiOpts{ + sym: "paimon_record_batch_reader_next", + rType: &typeResultNextBatch, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(reader *paimonRecordBatchReader) (unsafe.Pointer, unsafe.Pointer, error) { + return func(reader *paimonRecordBatchReader) (unsafe.Pointer, unsafe.Pointer, error) { + var result resultNextBatch + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&reader), + ) + if result.error != nil { + return nil, nil, parseError(ctx, result.error) + } + return result.array, result.schema, nil + } +}) + +var ffiRecordBatchReaderFree = newFFI(ffiOpts{ + sym: "paimon_record_batch_reader_free", + rType: &ffi.TypeVoid, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(reader *paimonRecordBatchReader) { + return func(reader *paimonRecordBatchReader) { + ffiCall( + nil, + unsafe.Pointer(&reader), + ) + } +}) + +var ffiArrowBatchFree = newFFI(ffiOpts{ + sym: "paimon_arrow_batch_free", + rType: &ffi.TypeVoid, + aTypes: []*ffi.Type{&typeArrowBatch}, +}, func(_ context.Context, ffiCall ffiCall) func(array unsafe.Pointer, schema unsafe.Pointer) { + return func(array unsafe.Pointer, schema unsafe.Pointer) { + batch := struct { + array unsafe.Pointer + schema unsafe.Pointer + }{array: array, schema: schema} + ffiCall( + nil, + unsafe.Pointer(&batch), + ) + } +}) diff --git a/bindings/go/table_scan.go b/bindings/go/table_scan.go new file mode 100644 index 0000000..9c87214 --- /dev/null +++ b/bindings/go/table_scan.go @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import ( + "context" + "sync" + "unsafe" + + "github.com/jupiterrider/ffi" +) + +// TableScan scans a table and produces a Plan containing data splits. +type TableScan struct { + ctx context.Context + lib *libRef + inner *paimonTableScan + closeOnce sync.Once +} + +// Close releases the table scan resources. Safe to call multiple times. +func (ts *TableScan) Close() { + ts.closeOnce.Do(func() { + ffiTableScanFree.symbol(ts.ctx)(ts.inner) + ts.inner = nil + ts.lib.release() + }) +} + +// Plan executes the scan and returns a Plan containing data splits to read. +func (ts *TableScan) Plan() (*Plan, error) { + if ts.inner == nil { + return nil, ErrClosed + } + planFn := ffiTableScanPlan.symbol(ts.ctx) + inner, err := planFn(ts.inner) + if err != nil { + return nil, err + } + ts.lib.acquire() + return &Plan{handle: newPlanHandle(ts.ctx, ts.lib, inner)}, nil +} + +var ffiTableScanFree = newFFI(ffiOpts{ + sym: "paimon_table_scan_free", + rType: &ffi.TypeVoid, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(_ context.Context, ffiCall ffiCall) func(scan *paimonTableScan) { + return func(scan *paimonTableScan) { + ffiCall( + nil, + unsafe.Pointer(&scan), + ) + } +}) + +var ffiTableScanPlan = newFFI(ffiOpts{ + sym: "paimon_table_scan_plan", + rType: &typeResultPlan, + aTypes: []*ffi.Type{&ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(scan *paimonTableScan) (*paimonPlan, error) { + return func(scan *paimonTableScan) (*paimonPlan, error) { + var result resultPlan + ffiCall( + unsafe.Pointer(&result), + unsafe.Pointer(&scan), + ) + if result.error != nil { + return nil, parseError(ctx, result.error) + } + return result.plan, nil + } +}) diff --git a/bindings/go/tests/go.mod b/bindings/go/tests/go.mod new file mode 100644 index 0000000..d2d340a --- /dev/null +++ b/bindings/go/tests/go.mod @@ -0,0 +1,27 @@ +module paimon_test + +go 1.22.4 + +require ( + github.com/apache/arrow-go/v18 v18.0.0 + github.com/apache/paimon-rust/bindings/go v0.0.0 +) + +require ( + github.com/ebitengine/purego v0.10.0 // indirect + github.com/goccy/go-json v0.10.3 // indirect + github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/jupiterrider/ffi v0.6.0 // indirect + github.com/klauspost/compress v1.17.11 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect + golang.org/x/mod v0.21.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/tools v0.26.0 // indirect + golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect +) + +replace github.com/apache/paimon-rust/bindings/go => ../ diff --git a/bindings/go/tests/go.sum b/bindings/go/tests/go.sum new file mode 100644 index 0000000..e605378 --- /dev/null +++ b/bindings/go/tests/go.sum @@ -0,0 +1,57 @@ +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= +github.com/apache/arrow-go/v18 v18.0.0 h1:1dBDaSbH3LtulTyOVYaBCHO3yVRwjV+TZaqn3g6V7ZM= +github.com/apache/arrow-go/v18 v18.0.0/go.mod h1:t6+cWRSmKgdQ6HsxisQjok+jBpKGhRDiqcf3p0p/F+A= +github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= +github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/ebitengine/purego v0.10.0 h1:QIw4xfpWT6GWTzaW5XEKy3HXoqrJGx1ijYHzTF0/ISU= +github.com/ebitengine/purego v0.10.0/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= +github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jupiterrider/ffi v0.6.0 h1:UX378KcZvH5c8qgLi9KL/bL82SZTHdRspZ+jj7bvBng= +github.com/jupiterrider/ffi v0.6.0/go.mod h1:PqZ5Go6X9by8CIXgfprxfMPYmn8oT5m2O7AA56s64bY= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= +golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= +golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= +golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= +golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0= +gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/bindings/go/tests/paimon_test.go b/bindings/go/tests/paimon_test.go new file mode 100644 index 0000000..28e3fc2 --- /dev/null +++ b/bindings/go/tests/paimon_test.go @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon_test + +import ( + "errors" + "io" + "os" + "sort" + "testing" + + "github.com/apache/arrow-go/v18/arrow/array" + paimon "github.com/apache/paimon-rust/bindings/go" +) + +// TestReadLogTable reads the test table and verifies the data matches expected values. +// +// The table was populated by Docker provisioning with: +// +// (1, 'alice'), (2, 'bob'), (3, 'carol') +func TestReadLogTable(t *testing.T) { + warehouse := os.Getenv("PAIMON_TEST_WAREHOUSE") + if warehouse == "" { + warehouse = "/tmp/paimon-warehouse" + } + + if _, err := os.Stat(warehouse); os.IsNotExist(err) { + t.Skipf("Skipping: warehouse %s does not exist (run 'make docker-up' first)", warehouse) + } + + catalog, err := paimon.NewFileSystemCatalog(warehouse) + if err != nil { + t.Fatalf("Failed to create catalog: %v", err) + } + defer catalog.Close() + + table, err := catalog.GetTable(paimon.NewIdentifier("default", "simple_log_table")) + if err != nil { + t.Fatalf("Failed to get table: %v", err) + } + defer table.Close() + + rb, err := table.NewReadBuilder() + if err != nil { + t.Fatalf("Failed to create read builder: %v", err) + } + defer rb.Close() + + scan, err := rb.NewScan() + if err != nil { + t.Fatalf("Failed to create scan: %v", err) + } + defer scan.Close() + + plan, err := scan.Plan() + if err != nil { + t.Fatalf("Failed to plan: %v", err) + } + defer plan.Close() + + splits := plan.Splits() + if len(splits) == 0 { + t.Fatal("Expected at least one split") + } + + read, err := rb.NewRead() + if err != nil { + t.Fatalf("Failed to create table read: %v", err) + } + defer read.Close() + + reader, err := read.NewRecordBatchReader(splits) + if err != nil { + t.Fatalf("Failed to create record batch reader: %v", err) + } + defer reader.Close() + + // Import Arrow batches via C Data Interface and collect rows. + // Strings are copied before Release because arrow-go's String.Value() + // returns zero-copy references into the Arrow buffer. + type row struct { + id int32 + name string + } + var rows []row + batchIdx := 0 + for { + record, err := reader.NextRecord() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + t.Fatalf("Batch %d: failed to read next record: %v", batchIdx, err) + } + + idIdx := record.Schema().FieldIndices("id") + nameIdx := record.Schema().FieldIndices("name") + if len(idIdx) == 0 || len(nameIdx) == 0 { + record.Release() + t.Fatalf("Batch %d: missing expected columns (id, name) in schema: %s", batchIdx, record.Schema()) + } + + idCol := record.Column(idIdx[0]).(*array.Int32) + nameCol := record.Column(nameIdx[0]).(*array.String) + + for j := 0; j < int(record.NumRows()); j++ { + rows = append(rows, row{ + id: idCol.Value(j), + name: string([]byte(nameCol.Value(j))), + }) + } + record.Release() + batchIdx++ + } + + if len(rows) == 0 { + t.Fatal("Expected at least one row, got 0") + } + + sort.Slice(rows, func(i, j int) bool { + return rows[i].id < rows[j].id + }) + + expected := []row{ + {1, "alice"}, + {2, "bob"}, + {3, "carol"}, + } + + if len(rows) != len(expected) { + t.Fatalf("Expected %d rows, got %d: %v", len(expected), len(rows), rows) + } + + for i, exp := range expected { + if rows[i] != exp { + t.Errorf("Row %d: expected %v, got %v", i, exp, rows[i]) + } + } +} diff --git a/bindings/go/types.go b/bindings/go/types.go new file mode 100644 index 0000000..fdc8990 --- /dev/null +++ b/bindings/go/types.go @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import ( + "context" + "runtime" + "unsafe" + + "github.com/jupiterrider/ffi" +) + +// FFI type definitions mirroring C repr structs from paimon-c. +var ( + // Result types: { value, *error } + // paimon_result_catalog_new { catalog: paimon_catalog, error: *paimon_error } + typeResultCatalogNew = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, + &ffi.TypePointer, + nil, + }[0], + } + + // paimon_result_get_table { table: paimon_table, error: *paimon_error } + typeResultGetTable = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, + &ffi.TypePointer, + nil, + }[0], + } + + // paimon_result_identifier_new { identifier: paimon_identifier, error: *paimon_error } + typeResultIdentifierNew = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, + &ffi.TypePointer, + nil, + }[0], + } + + // paimon_result_new_read { read: paimon_table_read, error: *paimon_error } + typeResultNewRead = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, + &ffi.TypePointer, + nil, + }[0], + } + + // paimon_result_read_builder { read_builder: paimon_read_builder, error: *paimon_error } + typeResultReadBuilder = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, + &ffi.TypePointer, + nil, + }[0], + } + + // paimon_result_table_scan { scan: paimon_table_scan, error: *paimon_error } + typeResultTableScan = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, + &ffi.TypePointer, + nil, + }[0], + } + + // paimon_result_plan { plan: paimon_plan, error: *paimon_error } + typeResultPlan = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, + &ffi.TypePointer, + nil, + }[0], + } + + // paimon_result_record_batch_reader { reader: *paimon_record_batch_reader, error: *paimon_error } + typeResultRecordBatchReader = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, + &ffi.TypePointer, + nil, + }[0], + } + + // paimon_arrow_batch { array: *c_void, schema: *c_void } + typeArrowBatch = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, + &ffi.TypePointer, + nil, + }[0], + } + + // paimon_result_next_batch { batch: paimon_arrow_batch, error: *paimon_error } + typeResultNextBatch = ffi.Type{ + Type: ffi.Struct, + Elements: &[]*ffi.Type{ + &ffi.TypePointer, // batch.array + &ffi.TypePointer, // batch.schema + &ffi.TypePointer, // error + nil, + }[0], + } +) + +// Go mirror structs for C types. + +type paimonBytes struct { + data *byte + len uintptr +} + +type paimonError struct { + code int32 + message paimonBytes +} + +// Opaque pointer wrappers +type paimonCatalog struct{} +type paimonIdentifier struct{} +type paimonTable struct{} +type paimonReadBuilder struct{} +type paimonTableScan struct{} +type paimonTableRead struct{} +type paimonPlan struct{} +type paimonRecordBatchReader struct{} + +// Result types matching the C repr structs +type resultCatalogNew struct { + catalog *paimonCatalog + error *paimonError +} + +type resultGetTable struct { + table *paimonTable + error *paimonError +} + +type resultIdentifierNew struct { + identifier *paimonIdentifier + error *paimonError +} + +type resultNewRead struct { + read *paimonTableRead + error *paimonError +} + +type resultReadBuilder struct { + readBuilder *paimonReadBuilder + error *paimonError +} + +type resultTableScan struct { + scan *paimonTableScan + error *paimonError +} + +type resultPlan struct { + plan *paimonPlan + error *paimonError +} + +type resultRecordBatchReader struct { + reader *paimonRecordBatchReader + error *paimonError +} + +// arrowBatch holds a single Arrow record batch via the Arrow C Data Interface. +type arrowBatch struct { + ctx context.Context + lib *libRef + array unsafe.Pointer + schema unsafe.Pointer + released bool +} + +func (b *arrowBatch) release() { + if b.released { + return + } + b.released = true + runtime.SetFinalizer(b, nil) + ffiArrowBatchFree.symbol(b.ctx)(b.array, b.schema) + b.lib.release() +} + +type resultNextBatch struct { + array unsafe.Pointer + schema unsafe.Pointer + error *paimonError +} + +func parseBytes(b paimonBytes) []byte { + if b.len == 0 { + return nil + } + data := make([]byte, b.len) + copy(data, unsafe.Slice(b.data, b.len)) + return data +} diff --git a/bindings/go/util_unix.go b/bindings/go/util_unix.go new file mode 100644 index 0000000..ec3c63a --- /dev/null +++ b/bindings/go/util_unix.go @@ -0,0 +1,52 @@ +//go:build !windows + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package paimon + +import ( + "github.com/ebitengine/purego" + "golang.org/x/sys/unix" +) + +func bytePtrFromString(s string) (*byte, error) { + if s == "" { + return new(byte), nil + } + return unix.BytePtrFromString(s) +} + +func loadLibrary(path string) (uintptr, error) { + return purego.Dlopen(path, purego.RTLD_LAZY|purego.RTLD_LOCAL) +} + +func freeLibrary(handle uintptr) error { + if handle == 0 { + return nil + } + return purego.Dlclose(handle) +} + +func getProcAddress(handle uintptr, name string) (uintptr, error) { + if handle == 0 { + return 0, nil + } + return purego.Dlsym(handle, name) +}