Next Generation, sub-microsecond latency shared memory IPC.
This is a shared-memory based pub/sub Interprocess Communication system that can be used in robotics and other applications. Why subspace? If your messages are transported between processes on the same computer, they travel through extremely low latency and high bandwidth shared memory buffers, kind of like they are going faster than light (not really, of course). If they go between computers, they are transported over the network at sub-light speed.
Some of the code in this project was contributed by Cruise LLC.
It has the following features:
- Single threaded coroutine based server process written in C++17
- Coroutine-aware client library, in C++17.
- Native Rust client library with the same shared-memory performance as the C++ client.
- C client wrapper for easy integration into other language bindings.
- Publish/subscribe methodology with multiple publisher and multiple subscribers per channel.
- No communication with server for message transfer.
- Message type agnostic transmission – bring your own serialization.
- Channel types, meaningful to user, not system.
- Single lock POSIX shared memory channels
- Both unreliable and reliable communications between publishers and subscribers.
- Ability to read the next or newest message in a channel.
- File-descriptor-based event triggers.
- Automatic UDP discovery and TCP bridging of channels between servers.
- Shadow process for crash recovery -- the server can restart and resume without losing shared memory state.
- Shared and weak pointers for message references.
- Ports to MacOS and Linux, ARM64 and x86_64.
- Builds using Bazel and uses Abseil and Protocol Buffers from Google.
- Uses my C++ coroutine library (https://github.com/dallison/co)
See the file docs/subspace.pdf for full documentation. Additional documentation:
- Checksums and User Metadata
- Client Architecture
- Server Architecture
- Rust Client
- Shadow Process (Crash Recovery)
Subspace can be built using either Bazel or CMake. Both build systems will automatically download and build all required dependencies.
This uses Google's Bazel to build. You will need to download Bazel to build it. The build also needs some external libraries, but Bazel takes care of downloading them. The .bazelrc file contains some configuration options.
bazel build --config=apple_silicon ...
Subspace really wants to be built using clang but modern GCC versions work well too. Depending on how your OS is configured, you might need to tell bazel what compiler to use.
CC=clang bazel build ...
Build a minimal set of binaries:
CC=clang bazel build //server:subspace_server //manual_tests:{pub,sub}
Then run each in a separate terminal:
./bazel-bin/server/subspace_server./bazel-bin/manual_tests/sub./bazel-bin/manual_tests/pub
You can run tests directly using bazel run or bazel test. The bazel run command will build and execute the test in one step, while bazel test runs tests in test mode (useful for CI/CD).
If C++ compile actions fail with xcrun: error: invalid DEVELOPER_DIR path (/Library/Developer/CommandLineTools), missing xcrun, the active developer directory does not contain a usable toolchain (often an incomplete Command Line Tools install, or DEVELOPER_DIR set incorrectly in your shell, IDE, or CI).
-
Prefer full Xcode and point the active developer dir at it:
sudo xcode-select -s /Applications/Xcode.app/Contents/Developer
-
Or install/repair Command Line Tools so
xcrunexists under that path:xcode-select --install
-
If you export
DEVELOPER_DIRyourself (e.g. in~/.zshrc), remove it or set it to matchxcode-select -p. -
Optional: after (1), you can force Bazel actions to use Xcode with a user
.bazelrcline:build --action_env=DEVELOPER_DIR=/Applications/Xcode.app/Contents/Developer
Adjust the path if Xcode is installed elsewhere.
Note: All tests automatically start a subspace server in a separate thread, so you don't need to run the server separately. The tests handle server lifecycle management internally.
The client_test is a comprehensive test suite that validates the core client functionality including publishers, subscribers, reliable/unreliable channels, message reading modes, and more.
# Run the test
bazel run //client:client_test
# Or run as a test (better for CI)
bazel test //client:client_testThe latency_test measures message transmission latency between publishers and subscribers. This is useful for benchmarking performance.
# Run the latency test
bazel run //client:latency_test
# Run with custom options (if supported)
bazel run //client:latency_test -- --helpThe stress_test performs stress testing with high message rates and multiple publishers/subscribers to verify system stability under load.
# Run the stress test (may take a while)
bazel run //client:stress_test
# Or run as a test
bazel test //client:stress_testThe Rust client tests exercise the full Rust client API against a real C++ server (started in-process via FFI), including cross-language interoperability tests that verify C++ publishers can talk to Rust subscribers and vice versa with checksums and metadata.
bazel test //rust_client:client_testTo run all tests at once:
# Run all tests
bazel test //...
# Run all tests in a specific directory
bazel test //client/...
bazel test //common/...
bazel test //rust_client/...Subspace also supports building with CMake (version 3.15 or later). CMake uses FetchContent to automatically download and build all dependencies including Abseil, Protobuf, Googletest, cpp_toolbelt, and co.
- CMake 3.15 or later
- C++17 compatible compiler (clang or g++)
- Git (for fetching dependencies)
mkdir build
cd build
cmake ..
makeYou can customize the build with CMake options:
cmake -DCMAKE_BUILD_TYPE=Release ..
make -j$(nproc)After building, you can run the tests:
cd build
ctestOr run individual tests:
./client/client_test
./common/common_test# Configure and build
mkdir build && cd build
cmake -DCMAKE_BUILD_TYPE=Release ..
make -j$(nproc)
# Run the server in one terminal
./server/subspace_server
# Run publisher/subscriber examples in other terminals
./client/latency_test
./client/stress_testTo use Subspace in your CMake project, you can add it as a subdirectory:
# In your CMakeLists.txt
add_subdirectory(subspace)
target_link_libraries(your_target
subspace_client
subspace_common
subspace_proto
)Or use FetchContent:
include(FetchContent)
FetchContent_Declare(
subspace
GIT_REPOSITORY https://github.com/dallison/subspace.git
GIT_TAG main # or specific tag/commit
)
FetchContent_MakeAvailable(subspace)
target_link_libraries(your_target
subspace_client
subspace_common
subspace_proto
)The CMake build provides the following targets:
subspace_client- Client librarysubspace_common- Common utilities librarysubspace_proto- Protocol buffer definitions librarylibserver- Server library (includes shadow replicator)subspace_server- Server executableshadow_lib- Shadow process librarysubspace_shadow- Shadow process executablesubspace_client_rust- Rust client library (built via Cargo; requirescargo)client_test,latency_test,stress_test- C++ client test executablescommon_test- Common library testsc_client_test- C client testsshadow_test- Shadow process testsrust_client_test- Rust client tests (viacargo test; requires the server binary)
Add this to your Bazel WORKSPACE file to get access to this library without downloading it manually.
http_archive(
name = "subspace",
urls = ["https://github.com/dallison/subspace/archive/refs/tags/A.B.C.tar.gz"],
strip_prefix = "subspace-A.B.C",
)
You can also add a sha256 field to ensure a canonical build if you like. Bazel will tell you what to put in for the hash when you first build it.
Subspace provides a high-performance, shared-memory based publish/subscribe IPC system. Messages are transmitted through POSIX shared memory with sub-microsecond latency. The system supports both reliable and unreliable message delivery, allowing you to choose the appropriate semantics for your use case.
The Client class is the main entry point for using Subspace. You can create a client in two ways:
Method 1: Using Create() (recommended)
#include "client/client.h"
auto client_or = subspace::Client::Create("/tmp/subspace", "my_client");
if (!client_or.ok()) {
// Handle error
return;
}
auto client = client_or.value();Method 2: Constructor + Init()
subspace::Client client;
auto status = client.Init("/tmp/subspace", "my_client");
if (!status.ok()) {
// Handle error
return;
}Parameters:
server_socket(default:"/tmp/subspace"): Path to the Unix domain socket where the Subspace server is listeningclient_name(default:""): Optional name for this client instancec(optional): Pointer to a coroutine if using coroutine-aware mode
class Client {
public:
// Initialize the client by connecting to the server
absl::Status Init(const std::string &server_socket = "/tmp/subspace",
const std::string &client_name = "");
// Create a publisher for a channel
absl::StatusOr<Publisher>
CreatePublisher(const std::string &channel_name,
int slot_size,
int num_slots,
const PublisherOptions &opts = PublisherOptions());
// Create a publisher with options specifying slot size and count
absl::StatusOr<Publisher>
CreatePublisher(const std::string &channel_name,
const PublisherOptions &opts = PublisherOptions());
// Create a subscriber for a channel
absl::StatusOr<Subscriber>
CreateSubscriber(const std::string &channel_name,
const SubscriberOptions &opts = SubscriberOptions());
// Get information about channels
absl::StatusOr<const ChannelInfo> GetChannelInfo(const std::string &channelName);
absl::StatusOr<const std::vector<ChannelInfo>> GetChannelInfo();
absl::StatusOr<const ChannelStats> GetChannelStats(const std::string &channelName);
absl::StatusOr<bool> ChannelExists(const std::string &channelName);
// Enable/disable debug output
void SetDebug(bool v);
// Enable/disable thread-safe mode
void SetThreadSafe(bool v);
};Publishers send messages to channels. You can create a publisher in two ways:
Method 1: Explicit slot size and count
auto pub_or = client->CreatePublisher("my_channel", 1024, 10);
if (!pub_or.ok()) {
// Handle error
return;
}
auto pub = pub_or.value();Method 2: Using PublisherOptions
auto pub_or = client->CreatePublisher("my_channel",
subspace::PublisherOptions()
.SetSlotSize(1024)
.SetNumSlots(10)
.SetReliable(true));// Get a message buffer
auto buffer_or = pub.GetMessageBuffer(1024);
if (!buffer_or.ok()) {
// Handle error (e.g., no free slots for reliable publisher)
return;
}
void* buffer = buffer_or.value();
// Fill in your message data
MyMessageType* msg = reinterpret_cast<MyMessageType*>(buffer);
msg->field1 = 42;
msg->field2 = "hello";
// Publish the message
auto msg_info_or = pub.PublishMessage(sizeof(MyMessageType));
if (!msg_info_or.ok()) {
// Handle error
return;
}
auto msg_info = msg_info_or.value();
// msg_info.ordinal contains the message sequence number
// msg_info.timestamp contains the publish timestampUsing GetMessageBufferSpan (C++17 style):
auto span_or = pub.GetMessageBufferSpan(1024);
if (!span_or.ok() || span_or.value().empty()) {
// Handle error
return;
}
auto span = span_or.value();
// span is an absl::Span<std::byte>
MyMessageType* msg = reinterpret_cast<MyMessageType*>(span.data());
// ... fill message ...
pub.PublishMessage(sizeof(MyMessageType));class Publisher {
public:
// Get a message buffer for writing
absl::StatusOr<void*> GetMessageBuffer(int32_t max_size = -1, bool lock = true);
absl::StatusOr<absl::Span<std::byte>> GetMessageBufferSpan(int32_t max_size = -1, bool lock = true);
// Publish a message
absl::StatusOr<const Message> PublishMessage(int64_t message_size);
// Cancel a publish (releases lock in thread-safe mode)
void CancelPublish();
// Wait for a reliable publisher to have a free slot
absl::Status Wait(const co::Coroutine *c = nullptr);
absl::Status Wait(std::chrono::nanoseconds timeout, const co::Coroutine *c = nullptr);
absl::StatusOr<int> Wait(const toolbelt::FileDescriptor &fd, const co::Coroutine *c = nullptr);
// Get file descriptor for polling
struct pollfd GetPollFd() const;
toolbelt::FileDescriptor GetFileDescriptor() const;
const toolbelt::FileDescriptor& GetRetirementFd() const;
// Channel information
std::string Name() const;
std::string Type() const;
bool IsReliable() const;
bool IsLocal() const;
bool IsFixedSize() const;
int32_t SlotSize() const;
int32_t NumSlots() const;
// Statistics
void GetStatsCounters(uint64_t &total_bytes, uint64_t &total_messages,
uint32_t &max_message_size, uint32_t &total_drops);
// Resize callback registration
absl::Status RegisterResizeCallback(
std::function<absl::Status(Publisher*, int, int)> callback);
// Prefix area and checksum/metadata sizes
int32_t PrefixSize() const; // Total prefix bytes (multiple of 64)
int32_t ChecksumSize() const; // Bytes reserved for checksum
int32_t MetadataSize() const; // Bytes of user metadata
// Writable span over the user metadata area in the current slot's prefix.
// Call between GetMessageBuffer() and PublishMessage().
absl::Span<std::byte> GetMetadata();
// Custom checksum support
void SetChecksumCallback(ChecksumCallback cb);
void ResetChecksumCallback();
};// Create a reliable publisher
auto pub_or = client->CreatePublisher("reliable_channel", 256, 5,
subspace::PublisherOptions().SetReliable(true));
auto pub = pub_or.value();
while (true) {
// Wait for a free slot (blocks until available)
auto status = pub.Wait();
if (!status.ok()) {
// Handle error
break;
}
// Get message buffer
auto buffer_or = pub.GetMessageBuffer(256);
if (!buffer_or.ok()) {
continue; // Should not happen after Wait()
}
// Fill and publish
MyMessage* msg = reinterpret_cast<MyMessage*>(buffer_or.value());
msg->data = compute_data();
pub.PublishMessage(sizeof(MyMessage));
}auto sub_or = client->CreateSubscriber("my_channel");
if (!sub_or.ok()) {
// Handle error
return;
}
auto sub = sub_or.value();Method 1: Read next message
auto msg_or = sub.ReadMessage(subspace::ReadMode::kReadNext);
if (!msg_or.ok()) {
// Handle error
return;
}
auto msg = msg_or.value();
if (msg.length == 0) {
// No message available
return;
}
// msg.buffer points to the message data
// msg.length is the message size in bytes
// msg.ordinal is the sequence number
// msg.timestamp is the publish timestamp
const MyMessageType* data = reinterpret_cast<const MyMessageType*>(msg.buffer);Method 2: Read newest message
auto msg_or = sub.ReadMessage(subspace::ReadMode::kReadNewest);
// This skips to the most recent message, discarding older onesMethod 3: Typed read (returns shared_ptr)
auto msg_ptr_or = sub.ReadMessage<MyMessageType>();
if (!msg_ptr_or.ok() || !msg_ptr_or.value()) {
// No message or error
return;
}
auto msg_ptr = msg_ptr_or.value();
// msg_ptr is a subspace::shared_ptr<MyMessageType>
// Access data: msg_ptr->field1, (*msg_ptr).field2
// Message is automatically released when msg_ptr goes out of scope// Wait indefinitely
auto status = sub.Wait();
if (!status.ok()) {
// Handle error
return;
}
// Wait with timeout
auto status = sub.Wait(std::chrono::milliseconds(100));
if (status.code() == absl::StatusCode::kDeadlineExceeded) {
// Timeout
}
// Wait with file descriptor (for integration with event loops)
toolbelt::FileDescriptor fd = /* your fd */;
auto fd_or = sub.Wait(fd);
if (fd_or.ok()) {
int triggered_fd = fd_or.value();
// Process message
}class Subscriber {
public:
// Read messages
absl::StatusOr<Message> ReadMessage(ReadMode mode = ReadMode::kReadNext);
template <typename T>
absl::StatusOr<shared_ptr<T>> ReadMessage(ReadMode mode = ReadMode::kReadNext);
// Find message by timestamp
absl::StatusOr<Message> FindMessage(uint64_t timestamp);
template <typename T>
absl::StatusOr<shared_ptr<T>> FindMessage(uint64_t timestamp);
// Wait for messages
absl::Status Wait(const co::Coroutine *c = nullptr);
absl::Status Wait(std::chrono::nanoseconds timeout, const co::Coroutine *c = nullptr);
absl::StatusOr<int> Wait(const toolbelt::FileDescriptor &fd, const co::Coroutine *c = nullptr);
// Get file descriptor for polling
struct pollfd GetPollFd() const;
toolbelt::FileDescriptor GetFileDescriptor() const;
// Channel information
std::string Name() const;
std::string Type() const;
bool IsReliable() const;
int32_t SlotSize() const;
int32_t NumSlots() const;
int64_t GetCurrentOrdinal() const;
// Callbacks
absl::Status RegisterDroppedMessageCallback(
std::function<void(Subscriber*, int64_t)> callback);
absl::Status RegisterMessageCallback(
std::function<void(Subscriber*, Message)> callback);
absl::Status ProcessAllMessages(ReadMode mode = ReadMode::kReadNext);
// Statistics
const ChannelCounters& GetChannelCounters();
int NumActiveMessages() const;
// Prefix area and checksum/metadata sizes
int32_t PrefixSize() const; // Total prefix bytes (multiple of 64)
int32_t ChecksumSize() const; // Bytes reserved for checksum
int32_t MetadataSize() const; // Bytes of user metadata
// Read-only span over the user metadata area in the most recently
// read message's prefix. Valid while the message is active.
absl::Span<const std::byte> GetMetadata();
// Custom checksum support
void SetChecksumCallback(ChecksumCallback cb);
void ResetChecksumCallback();
};auto sub_or = client->CreateSubscriber("my_channel",
subspace::SubscriberOptions().SetReliable(true));
auto sub = sub_or.value();
// Register callback for dropped messages
sub.RegisterDroppedMessageCallback([](subspace::Subscriber* sub, int64_t count) {
std::cerr << "Dropped " << count << " messages on " << sub->Name() << std::endl;
});
// Register callback for received messages
sub.RegisterMessageCallback([](subspace::Subscriber* sub, subspace::Message msg) {
if (msg.length > 0) {
process_message(msg);
}
});
// In your event loop
while (true) {
// Process all available messages
sub.ProcessAllMessages();
// Or wait and read manually
sub.Wait();
auto msg = sub.ReadMessage();
if (msg.ok() && msg->length > 0) {
process_message(*msg);
}
}Reliable channels guarantee that reliable subscribers will never miss a message from reliable publishers. This is achieved through reference counting: a reliable publisher cannot reuse a slot until all reliable subscribers have released it.
Characteristics:
- Messages are never dropped for reliable subscribers
- Publishers may block if all slots are in use
- Higher memory usage (slots held until all subscribers release)
- Use
Wait()to block until a slot is available
When to use:
- Critical data that must not be lost
- Control messages
- State synchronization
- Any scenario where message loss is unacceptable
Example:
// Reliable publisher
auto pub = client->CreatePublisher("control", 128, 10,
subspace::PublisherOptions().SetReliable(true)).value();
// Reliable subscriber
auto sub = client->CreateSubscriber("control",
subspace::SubscriberOptions().SetReliable(true)).value();Unreliable channels provide best-effort delivery with no guarantees. If a subscriber cannot keep up, messages may be dropped. This provides the lowest latency and highest throughput.
Characteristics:
- Messages may be dropped if subscriber is slow
- Publishers never block (always get a slot immediately)
- Lower memory usage
- Highest performance
When to use:
- High-frequency sensor data where occasional loss is acceptable
- Video/audio streaming
- Telemetry data
- Any scenario where latency is more important than reliability
Example:
// Unreliable publisher (default)
auto pub = client->CreatePublisher("sensor_data", 64, 100).value();
// Unreliable subscriber (default)
auto sub = client->CreateSubscriber("sensor_data").value();You can mix reliable and unreliable publishers/subscribers on the same channel:
- Reliable subscriber + Reliable publisher: Guaranteed delivery
- Reliable subscriber + Unreliable publisher: Best effort (may drop)
- Unreliable subscriber + Reliable publisher: May drop if slow
- Unreliable subscriber + Unreliable publisher: Best effort, may drop
The PublisherOptions struct configures publisher behavior. You can use it in two ways:
auto opts = subspace::PublisherOptions()
.SetSlotSize(1024)
.SetNumSlots(10)
.SetReliable(true)
.SetLocal(false)
.SetType("MyMessageType")
.SetFixedSize(false)
.SetChecksum(true);
auto pub = client->CreatePublisher("channel", opts).value();auto pub = client->CreatePublisher("channel",
subspace::PublisherOptions{
.slot_size = 1024,
.num_slots = 10,
.reliable = true,
.local = false,
.type = "MyMessageType",
.fixed_size = false,
.checksum = true,
.checksum_size = 4, // default CRC32
.metadata_size = 0, // no user metadata
}).value();| Field/Method | Type | Default | Description |
|---|---|---|---|
slot_size / SetSlotSize() |
int32_t |
0 |
Size of each message slot in bytes. Must be set if using options-only CreatePublisher. |
num_slots / SetNumSlots() |
int32_t |
0 |
Number of slots in the channel. Must be set if using options-only CreatePublisher. |
reliable / SetReliable() |
bool |
false |
If true, reliable delivery (see Reliable Channels section). |
local / SetLocal() |
bool |
false |
If true, messages are only visible on the local machine (not bridged). |
type / SetType() |
std::string |
"" |
User-defined message type identifier. All publishers/subscribers must use the same type. |
fixed_size / SetFixedSize() |
bool |
false |
If true, prevents automatic resizing of slots. |
bridge / SetBridge() |
bool |
false |
Internal: marks this as a bridge publisher. |
mux / SetMux() |
std::string |
"" |
Multiplexer name for virtual channels. |
vchan_id / SetVchanId() |
int |
-1 |
Virtual channel ID (-1 for server-assigned). |
activate / SetActivate() |
bool |
false |
If true, channel is activated even if unreliable. |
notify_retirement / SetNotifyRetirement() |
bool |
false |
If true, notify when slots are retired. |
checksum / SetChecksum() |
bool |
false |
If true, calculate checksums for all messages. |
checksum_size / SetChecksumSize() |
int32_t |
4 |
Number of bytes reserved for the checksum (starting at the checksum field of MessagePrefix). Default 4 for CRC32. Increase for larger checksums (e.g. 20 for SHA-1). |
metadata_size / SetMetadataSize() |
int32_t |
0 |
Number of bytes of user metadata stored immediately after the checksum area. Accessible via Publisher::GetMetadata() / Subscriber::GetMetadata(). |
Getter Methods:
int32_t SlotSize() constint32_t NumSlots() constbool IsReliable() constbool IsLocal() constbool IsFixedSize() constconst std::string& Type() constbool IsBridge() constconst std::string& Mux() constint VchanId() constbool Activate() constbool NotifyRetirement() constbool Checksum() constint32_t ChecksumSize() constint32_t MetadataSize() const
Example: Creating a reliable publisher with checksums
auto pub = client->CreatePublisher("secure_channel", 512, 20,
subspace::PublisherOptions()
.SetReliable(true)
.SetChecksum(true)
.SetType("SecureMessage")).value();Each message slot has a prefix area preceding the message buffer. The prefix
contains the MessagePrefix struct (ordinal, timestamp, size, flags, etc.) whose
checksum field marks the start of the checksum storage. The total prefix
area is always aligned to a 64-byte boundary and its size is determined by:
prefix_size = align_up(offsetof(MessagePrefix, checksum) + checksum_size + metadata_size, 64)
With the defaults (checksum_size = 4, metadata_size = 0) the prefix area
is 64 bytes — the MessagePrefix itself. Increasing either value causes the
prefix to grow in 64-byte increments.
When checksums are enabled (SetChecksum(true)), the built-in CRC32 writes a
4-byte checksum at the start of the checksum area. If you need a larger
checksum (e.g. 20 bytes for SHA-1), set checksum_size accordingly:
auto pub = client->CreatePublisher("channel",
subspace::PublisherOptions()
.SetSlotSize(1024)
.SetNumSlots(10)
.SetChecksum(true)
.SetChecksumSize(20)).value();The metadata area sits immediately after the checksum area in the prefix.
Set metadata_size on the publisher to reserve space:
auto pub = client->CreatePublisher("channel",
subspace::PublisherOptions()
.SetSlotSize(1024)
.SetNumSlots(10)
.SetMetadataSize(16)).value();
// Write metadata between GetMessageBuffer() and PublishMessage():
auto buffer = pub.GetMessageBuffer(128).value();
auto meta = pub.GetMetadata(); // absl::Span<std::byte>, 16 bytes
memcpy(meta.data(), my_metadata, 16);
pub.PublishMessage(128);Subscribers read metadata from the most recently read message:
auto msg = sub.ReadMessage().value();
auto meta = sub.GetMetadata(); // absl::Span<const std::byte>, 16 bytesIf you need a custom checksum algorithm, you can provide a callback that
replaces the built-in CRC32. The callback receives the data to checksum
(as three spans covering the prefix header, the prefix extension, and the
message body) plus a writable
absl::Span<std::byte> of ChecksumSize() bytes where it should write
the result:
using ChecksumCallback =
std::function<void(const std::array<absl::Span<const uint8_t>, 3> &data,
absl::Span<std::byte> checksum)>;Example: Simple additive checksum (4 bytes)
auto fake_crc = [](const std::array<absl::Span<const uint8_t>, 3> &data,
absl::Span<std::byte> checksum) {
uint32_t sum = 0;
for (const auto &span : data) {
for (uint8_t byte : span) {
sum += byte;
}
}
*reinterpret_cast<uint32_t *>(checksum.data()) = sum;
};
pub.SetChecksumCallback(fake_crc);
sub.SetChecksumCallback(fake_crc);Example: 20-byte custom checksum (requires checksum_size = 20)
auto sha1_like = [](const std::array<absl::Span<const uint8_t>, 3> &data,
absl::Span<std::byte> checksum) {
// checksum.size() == 20
// Write your 20-byte digest into checksum.data()
my_sha1(data, checksum.data(), checksum.size());
};
pub.SetChecksumCallback(sha1_like);
sub.SetChecksumCallback(sha1_like);Call ResetChecksumCallback() to revert to the built-in CRC32.
The SubscriberOptions struct configures subscriber behavior. Like PublisherOptions, it supports both chained setters and designated initializers.
auto opts = subspace::SubscriberOptions()
.SetReliable(true)
.SetType("MyMessageType")
.SetMaxActiveMessages(10)
.SetChecksum(true)
.SetPassChecksumErrors(false);
auto sub = client->CreateSubscriber("channel", opts).value();auto sub = client->CreateSubscriber("channel",
subspace::SubscriberOptions{
.reliable = true,
.type = "MyMessageType",
.max_active_messages = 10,
.checksum = true,
.pass_checksum_errors = false
}).value();| Field/Method | Type | Default | Description |
|---|---|---|---|
reliable / SetReliable() |
bool |
false |
If true, reliable delivery (see Reliable Channels section). |
type / SetType() |
std::string |
"" |
User-defined message type identifier. Must match publisher type. |
max_active_messages / SetMaxActiveMessages() |
int |
1 |
Maximum number of active messages (shared_ptrs) that can be held simultaneously. |
max_active_messages / SetMaxSharedPtrs() |
int |
0 |
Alias: sets max_active_messages to n+1. |
log_dropped_messages / SetLogDroppedMessages() |
bool |
true |
If true, log when messages are dropped. |
bridge / SetBridge() |
bool |
false |
Internal: marks this as a bridge subscriber. |
mux / SetMux() |
std::string |
"" |
Multiplexer name for virtual channels. |
vchan_id / SetVchanId() |
int |
-1 |
Virtual channel ID (-1 for server-assigned). |
pass_activation / SetPassActivation() |
bool |
false |
If true, activation messages are passed to the user. |
read_write / SetReadWrite() |
bool |
false |
If true, map buffers as read-write instead of read-only. |
checksum / SetChecksum() |
bool |
false |
If true, verify checksums on received messages. |
pass_checksum_errors / SetPassChecksumErrors() |
bool |
false |
If true, pass messages with checksum errors (with flag set). If false, return error. |
Getter Methods:
bool IsReliable() constconst std::string& Type() constint MaxActiveMessages() constint MaxSharedPtrs() constbool LogDroppedMessages() constbool IsBridge() constconst std::string& Mux() constint VchanId() constbool PassActivation() constbool ReadWrite() constbool Checksum() constbool PassChecksumErrors() const
Example: Creating a reliable subscriber with checksum verification
auto sub = client->CreateSubscriber("secure_channel",
subspace::SubscriberOptions()
.SetReliable(true)
.SetChecksum(true)
.SetPassChecksumErrors(false) // Return error on checksum failure
.SetType("SecureMessage")
.SetMaxActiveMessages(5)).value();Here's a complete example showing publisher and subscriber:
#include "client/client.h"
#include <iostream>
struct SensorData {
double temperature;
double pressure;
uint64_t timestamp;
};
int main() {
// Create client
auto client_or = subspace::Client::Create("/tmp/subspace", "sensor_app");
if (!client_or.ok()) {
std::cerr << "Failed to create client: " << client_or.status() << std::endl;
return 1;
}
auto client = client_or.value();
// Create reliable publisher
auto pub_or = client->CreatePublisher("sensors", sizeof(SensorData), 10,
subspace::PublisherOptions()
.SetReliable(true)
.SetType("SensorData"));
if (!pub_or.ok()) {
std::cerr << "Failed to create publisher: " << pub_or.status() << std::endl;
return 1;
}
auto pub = pub_or.value();
// Create reliable subscriber
auto sub_or = client->CreateSubscriber("sensors",
subspace::SubscriberOptions()
.SetReliable(true)
.SetType("SensorData"));
if (!sub_or.ok()) {
std::cerr << "Failed to create subscriber: " << sub_or.status() << std::endl;
return 1;
}
auto sub = sub_or.value();
// Publisher loop
for (int i = 0; i < 100; ++i) {
// Wait for free slot
pub.Wait();
// Get buffer
auto buffer_or = pub.GetMessageBuffer(sizeof(SensorData));
if (!buffer_or.ok()) continue;
// Fill message
SensorData* data = reinterpret_cast<SensorData*>(buffer_or.value());
data->temperature = 20.0 + i * 0.1;
data->pressure = 1013.25;
data->timestamp = std::chrono::steady_clock::now().time_since_epoch().count();
// Publish
auto msg_or = pub.PublishMessage(sizeof(SensorData));
if (msg_or.ok()) {
std::cout << "Published message " << msg_or->ordinal << std::endl;
}
}
// Subscriber loop
for (int i = 0; i < 100; ++i) {
// Wait for message
sub.Wait();
// Read message
auto msg_or = sub.ReadMessage<SensorData>();
if (!msg_or.ok() || !msg_or.value()) {
continue;
}
auto msg = msg_or.value();
std::cout << "Received: temp=" << msg->temperature
<< ", pressure=" << msg->pressure
<< ", ordinal=" << msg.GetMessage().ordinal << std::endl;
}
return 0;
}Subspace provides a C API (c_client/subspace.h) for applications that need to use Subspace from C code or integrate it into other language bindings. The C API is simpler and has fewer dependencies than the C++ API, making it easier to integrate into projects that don't use C++.
The C API uses a thread-local error mechanism similar to errno. Most functions return a boolean indicating success (true) or failure (false). When a function fails, you can check for errors and retrieve the error message:
#include "c_client/subspace.h"
// Check if there was an error
if (subspace_has_error()) {
// Get the error message
char* error = subspace_get_last_error();
fprintf(stderr, "Error: %s\n", error);
}The error message is a static string owned by the library and is thread-local (one error message per thread).
// Create client with default socket ("/tmp/subspace") and no name
SubspaceClient client = subspace_create_client();
// Create client with custom socket
SubspaceClient client = subspace_create_client_with_socket("/tmp/my_subspace");
// Create client with socket and name
SubspaceClient client = subspace_create_client_with_socket_and_name(
"/tmp/subspace", "my_client_name");
// Check if client was created successfully
if (client.client == NULL) {
fprintf(stderr, "Failed to create client: %s\n", subspace_get_last_error());
return 1;
}
// Clean up when done
subspace_remove_client(&client);Publisher Options:
// Create default publisher options
SubspacePublisherOptions pub_opts = subspace_publisher_options_default(1024, 10);
// pub_opts.slot_size = 1024
// pub_opts.num_slots = 10
// pub_opts.reliable = false
// pub_opts.fixed_size = false
// pub_opts.activate = false
// pub_opts.checksum_size = 4 (CRC32)
// pub_opts.metadata_size = 0 (no user metadata)
// Customize options
pub_opts.reliable = true;
pub_opts.fixed_size = false;
pub_opts.type.type = "MyMessageType";
pub_opts.type.type_length = strlen(pub_opts.type.type);
pub_opts.checksum_size = 20; // e.g. 20-byte digest
pub_opts.metadata_size = 32; // 32 bytes of user metadata
// Create publisher
SubspacePublisher pub = subspace_create_publisher(client, "my_channel", pub_opts);
if (pub.publisher == NULL) {
fprintf(stderr, "Failed to create publisher: %s\n", subspace_get_last_error());
return 1;
}Subscriber Options:
// Create default subscriber options
SubspaceSubscriberOptions sub_opts = subspace_subscriber_options_default();
// sub_opts.reliable = false
// sub_opts.max_active_messages = 1
// sub_opts.pass_activation = false
// sub_opts.log_dropped_messages = false
// Customize options
sub_opts.reliable = true;
sub_opts.max_active_messages = 10;
sub_opts.type.type = "MyMessageType";
sub_opts.type.type_length = strlen(sub_opts.type.type);
// Create subscriber
SubspaceSubscriber sub = subspace_create_subscriber(client, "my_channel", sub_opts);
if (sub.subscriber == NULL) {
fprintf(stderr, "Failed to create subscriber: %s\n", subspace_get_last_error());
return 1;
}// Get a message buffer
SubspaceMessageBuffer buffer = subspace_get_message_buffer(pub, 1024);
if (buffer.buffer == NULL) {
// For reliable publishers, you may need to wait
if (pub_opts.reliable) {
subspace_wait_for_publisher(pub);
buffer = subspace_get_message_buffer(pub, 1024);
} else {
fprintf(stderr, "Failed to get buffer: %s\n", subspace_get_last_error());
return 1;
}
}
// Fill in your message data
MyMessageType* msg = (MyMessageType*)buffer.buffer;
msg->field1 = 42;
msg->field2 = 3.14;
// Publish the message
const SubspaceMessage pub_status = subspace_publish_message(pub, sizeof(MyMessageType));
if (pub_status.length == 0) {
fprintf(stderr, "Failed to publish: %s\n", subspace_get_last_error());
return 1;
}
// pub_status.ordinal contains the message sequence number
// pub_status.timestamp contains the publish timestamp// Read next message
SubspaceMessage msg = subspace_read_message(sub);
if (msg.length == 0) {
// No message available
// For reliable subscribers, you may want to wait
if (sub_opts.reliable) {
subspace_wait_for_subscriber(sub);
msg = subspace_read_message(sub);
}
}
if (msg.length > 0) {
// Process the message
const MyMessageType* data = (const MyMessageType*)msg.buffer;
printf("Received message ordinal: %lu\n", msg.ordinal);
printf("Message timestamp: %lu\n", msg.timestamp);
// IMPORTANT: Free the message when done
subspace_free_message(&msg);
}
// Read newest message (skips to most recent)
SubspaceMessage newest = subspace_read_message_with_mode(sub, kSubspaceReadNewest);
if (newest.length > 0) {
// Process message
subspace_free_message(&newest);
}Important: You must call subspace_free_message() when done with a message. The max_active_messages option determines how many messages you can hold simultaneously. If you don't free messages, the subscriber will run out of slots and be unable to read more messages.
// Wait indefinitely for a message
if (!subspace_wait_for_subscriber(sub)) {
fprintf(stderr, "Wait failed: %s\n", subspace_get_last_error());
return 1;
}
// Wait with file descriptor (for integration with event loops)
int fd = /* your file descriptor */;
int triggered_fd = subspace_wait_for_subscriber_with_fd(sub, fd);
if (triggered_fd < 0) {
fprintf(stderr, "Wait failed: %s\n", subspace_get_last_error());
return 1;
}The C API provides file descriptors that can be used with poll(), epoll(), or other event notification mechanisms:
// Get pollfd structure for subscriber
struct pollfd pfd = subspace_get_subscriber_poll_fd(sub);
// pfd.fd is the file descriptor
// pfd.events should be set to POLLIN
// Use in poll() call
int ret = poll(&pfd, 1, timeout_ms);
if (ret > 0 && (pfd.revents & POLLIN)) {
// Message available, read it
SubspaceMessage msg = subspace_read_message(sub);
// ... process message ...
subspace_free_message(&msg);
}
// Or get the raw file descriptor
int fd = subspace_get_subscriber_fd(sub);
// Use fd with epoll, select, etc.The C API supports callbacks for message reception and dropped messages:
// Message callback
void message_callback(SubspaceSubscriber sub, SubspaceMessage msg) {
if (msg.length > 0) {
printf("Received message of size %zu\n", msg.length);
// Process message
// IMPORTANT: Free the message when done
subspace_free_message(&msg);
}
}
// Register callback
if (!subspace_register_subscriber_callback(sub, message_callback)) {
fprintf(stderr, "Failed to register callback: %s\n", subspace_get_last_error());
return 1;
}
// Process all available messages (calls the callback for each)
subspace_process_all_messages(sub);
// Unregister callback
subspace_remove_subscriber_callback(sub);
// Dropped message callback
void dropped_callback(SubspaceSubscriber sub, int64_t count) {
fprintf(stderr, "Dropped %ld messages\n", count);
}
subspace_register_dropped_message_callback(sub, dropped_callback);#include "c_client/subspace.h"
#include <stdio.h>
#include <string.h>
struct SensorData {
double temperature;
double pressure;
uint64_t timestamp;
};
int main() {
// Create client
SubspaceClient client = subspace_create_client();
if (client.client == NULL) {
fprintf(stderr, "Failed to create client: %s\n", subspace_get_last_error());
return 1;
}
// Create reliable publisher
SubspacePublisherOptions pub_opts = subspace_publisher_options_default(
sizeof(SensorData), 10);
pub_opts.reliable = true;
pub_opts.type.type = "SensorData";
pub_opts.type.type_length = strlen(pub_opts.type.type);
SubspacePublisher pub = subspace_create_publisher(client, "sensors", pub_opts);
if (pub.publisher == NULL) {
fprintf(stderr, "Failed to create publisher: %s\n", subspace_get_last_error());
return 1;
}
// Create reliable subscriber
SubspaceSubscriberOptions sub_opts = subspace_subscriber_options_default();
sub_opts.reliable = true;
sub_opts.type.type = "SensorData";
sub_opts.type.type_length = strlen(sub_opts.type.type);
SubspaceSubscriber sub = subspace_create_subscriber(client, "sensors", sub_opts);
if (sub.subscriber == NULL) {
fprintf(stderr, "Failed to create subscriber: %s\n", subspace_get_last_error());
return 1;
}
// Publisher loop
for (int i = 0; i < 100; ++i) {
// Wait for free slot (reliable publisher)
subspace_wait_for_publisher(pub);
// Get buffer
SubspaceMessageBuffer buffer = subspace_get_message_buffer(pub, sizeof(SensorData));
if (buffer.buffer == NULL) {
continue;
}
// Fill message
struct SensorData* data = (struct SensorData*)buffer.buffer;
data->temperature = 20.0 + i * 0.1;
data->pressure = 1013.25;
data->timestamp = /* get current time */;
// Publish
const SubspaceMessage pub_status = subspace_publish_message(pub, sizeof(SensorData));
if (pub_status.length > 0) {
printf("Published message %lu\n", pub_status.ordinal);
}
}
// Subscriber loop
for (int i = 0; i < 100; ++i) {
// Wait for message
subspace_wait_for_subscriber(sub);
// Read message
SubspaceMessage msg = subspace_read_message(sub);
if (msg.length > 0) {
const struct SensorData* data = (const struct SensorData*)msg.buffer;
printf("Received: temp=%.2f, pressure=%.2f, ordinal=%lu\n",
data->temperature, data->pressure, msg.ordinal);
subspace_free_message(&msg);
}
}
// Cleanup
subspace_remove_subscriber(&sub);
subspace_remove_publisher(&pub);
subspace_remove_client(&client);
return 0;
}Client Functions:
SubspaceClient subspace_create_client(void)SubspaceClient subspace_create_client_with_socket(const char *socket_name)SubspaceClient subspace_create_client_with_socket_and_name(const char *socket_name, const char *client_name)bool subspace_remove_client(SubspaceClient *client)
Publisher Functions:
SubspacePublisherOptions subspace_publisher_options_default(int32_t slot_size, int num_slots)SubspacePublisher subspace_create_publisher(SubspaceClient client, const char *channel_name, SubspacePublisherOptions options)SubspaceMessageBuffer subspace_get_message_buffer(SubspacePublisher publisher, size_t max_size)const SubspaceMessage subspace_publish_message(SubspacePublisher publisher, size_t messageSize)bool subspace_wait_for_publisher(SubspacePublisher publisher)int subspace_wait_for_publisher_with_fd(SubspacePublisher publisher, int fd)struct pollfd subspace_get_publisher_poll_fd(SubspacePublisher publisher)int subspace_get_publisher_fd(SubspacePublisher publisher)bool subspace_register_resize_callback(SubspacePublisher publisher, bool (*callback)(SubspacePublisher, int32_t, int32_t))bool subspace_unregister_resize_callback(SubspacePublisher publisher)bool subspace_remove_publisher(SubspacePublisher *publisher)
Subscriber Functions:
SubspaceSubscriberOptions subspace_subscriber_options_default(void)SubspaceSubscriber subspace_create_subscriber(SubspaceClient client, const char *channel_name, SubspaceSubscriberOptions options)SubspaceMessage subspace_read_message(SubspaceSubscriber subscriber)SubspaceMessage subspace_read_message_with_mode(SubspaceSubscriber subscriber, SubspaceReadMode mode)bool subspace_free_message(SubspaceMessage *message)bool subspace_wait_for_subscriber(SubspaceSubscriber subscriber)int subspace_wait_for_subscriber_with_fd(SubspaceSubscriber subscriber, int fd)struct pollfd subspace_get_subscriber_poll_fd(SubspaceSubscriber subscriber)int subspace_get_subscriber_fd(SubspaceSubscriber subscriber)int32_t subspace_get_subscriber_slot_size(SubspaceSubscriber subscriber)int subspace_get_subscriber_num_slots(SubspaceSubscriber subscriber)SubspaceTypeInfo subspace_get_subscriber_type(SubspaceSubscriber subscriber)bool subspace_register_subscriber_callback(SubspaceSubscriber subscriber, void (*callback)(SubspaceSubscriber, SubspaceMessage))bool subspace_remove_subscriber_callback(SubspaceSubscriber subscriber)bool subspace_register_dropped_message_callback(SubspaceSubscriber subscriber, void (*callback)(SubspaceSubscriber, int64_t))bool subspace_remove_dropped_message_callback(SubspaceSubscriber subscriber)bool subspace_process_all_messages(SubspaceSubscriber subscriber)bool subspace_remove_subscriber(SubspaceSubscriber *subscriber)
Error Functions:
char* subspace_get_last_error(void)bool subspace_has_error(void)
Subspace includes a native Rust client library (rust_client/) that communicates
with the same C++ server and shares the same shared-memory layout as the C++
client. Rust publishers and C++ subscribers (and vice versa) can exchange
messages on the same channels, including checksums and user metadata --
cross-language interoperability is covered by automated tests.
use subspace_client::{Client, ReadMode};
use subspace_client::options::{PublisherOptions, SubscriberOptions};
// Connect to the server.
let client = Client::new("/tmp/subspace", "my_app")?;
// Create a publisher.
let pub_opts = PublisherOptions::new()
.set_slot_size(1024)
.set_num_slots(10);
let publisher = client.create_publisher("sensor_data", &pub_opts)?;
// Publish a message.
let (buf, _cap) = publisher.get_message_buffer(64)?.unwrap();
unsafe { std::ptr::copy_nonoverlapping(b"hello".as_ptr(), buf, 5); }
publisher.publish_message(5)?;
// Create a subscriber (in another client or the same one).
let sub_opts = SubscriberOptions::new();
let subscriber = client.create_subscriber("sensor_data", &sub_opts)?;
// Read a message.
let msg = subscriber.read_message(ReadMode::ReadNext)?;
assert_eq!(msg.length, 5);- Full pub/sub support: unreliable and reliable channels, read-next and read-newest modes, activation messages, virtual channels.
- Checksums (built-in CRC32 or custom callbacks) and per-message user metadata.
- File-descriptor-based
wait()for integration with event loops andpoll(). - Slot retirement notification for reliable publishers.
- Runs on Linux and macOS (ARM64 and x86_64).
# Build the library
bazel build //rust_client:subspace_client_rust
# Run the tests (starts an in-process C++ server via FFI)
bazel test //rust_client:client_testrust_binary(
name = "my_app",
deps = ["@subspace//rust_client:subspace_client_rust"],
)See docs/rust-client.md for the full API reference and usage guide.
Subspace is message-type agnostic. You can send any data structure as long as it fits in the slot size. Common approaches:
- Plain C structs (as shown above) - fastest, no serialization overhead
- Protocol Buffers - cross-language, versioned
- Zero-copy facilities like Phaser or Neutron - zero-copy, schema evolution
- JSON - human-readable, flexible
- Custom binary formats
The type field in PublisherOptions and SubscriberOptions is purely for application-level type checking - Subspace doesn't validate or enforce it.
By default, the Client class is not thread-safe. To enable thread-safe mode:
client->SetThreadSafe(true);In thread-safe mode:
GetMessageBuffer()acquires a lock that is held untilPublishMessage()orCancelPublish()is called- You must call
PublishMessage()orCancelPublish()afterGetMessageBuffer() - Multiple threads can safely use the same client instance
Subspace is coroutine-aware. If you pass a coroutine pointer when creating the client, blocking operations will yield to other coroutines:
co::CoroutineScheduler scheduler;
co::Coroutine* co = scheduler.CreateCoroutine([]() {
auto client = subspace::Client::Create("/tmp/subspace", "co_client",
co::Coroutine::Current()).value();
// ... use client ...
});
scheduler.Run();When using coroutines, Wait() operations will yield instead of blocking the thread.
Subspace supports a shadow process that mirrors the server's channel, publisher, and subscriber state. If the server crashes and restarts it can reconnect to the shadow, reload the full state, and resume operation without losing shared-memory buffers. Existing clients can then reclaim their publishers and subscribers.
The shadow is a lightweight, coroutine-based daemon that maintains a copy of
the server's channel database. It communicates with the server over a Unix
domain socket using protobuf-encoded ShadowEvent messages. Shared-memory
file descriptors (SCB, CCB, BCB, trigger, and retirement FDs) are passed once
using SCM_RIGHTS so the shadow holds them open even if the server dies.
On startup the server connects to the shadow and receives a state dump. If the state dump contains channels (i.e. the shadow has state from a previous server instance), the server re-maps the existing shared memory and recreates its internal channel, publisher, and subscriber structures. It then re-replicates all recovered state back to the shadow(s) so they stay in sync.
The server supports two shadows -- a primary and a secondary -- for additional redundancy. On recovery it tries the primary first; if the primary has no state (or is unavailable) it falls back to the secondary. After recovery, both shadows are brought up to date with the full state.
Start one or two shadow processes:
# Primary shadow
./bazel-bin/shadow/subspace_shadow --socket=/tmp/subspace_shadow
# Optional secondary shadow
./bazel-bin/shadow/subspace_shadow --socket=/tmp/subspace_shadow2Then start the server with shadow sockets:
./bazel-bin/server/subspace_server \
--shadow_socket=/tmp/subspace_shadow \
--secondary_shadow_socket=/tmp/subspace_shadow2If the server is killed and restarted with the same flags, it will recover its full state from whichever shadow is available.
- Channel definitions (name, slot size, number of slots, type, flags).
- Shared memory mappings -- buffers remain intact in
/dev/shm(Linux) or POSIX shared memory (macOS). - Publisher and subscriber metadata (IDs, trigger FDs, reliability settings, tunnel flags).
- The session ID, so clients can detect a server restart and reclaim their connections.
- Active client TCP connections -- clients must reconnect and re-register their publishers and subscribers.
- In-flight messages that had not yet been consumed are still in shared memory, but subscribers need to re-attach to resume reading.
| Flag | Default | Description |
|---|---|---|
--shadow_socket |
"" (disabled) |
Unix socket path for the primary shadow process |
--secondary_shadow_socket |
"" (disabled) |
Unix socket path for the secondary shadow process |
| Flag | Default | Description |
|---|---|---|
--socket |
/tmp/subspace_shadow |
Unix socket path to listen on |
--log_level |
info |
Log level (debug, info, warning, error) |
See docs/shadow-process.md for the full design document including the protocol, FD lifecycle, and recovery sequence.