The official Python client library for the Posthook API -- schedule webhooks and deliver them reliably.
pip install posthook-pythonRequirements: Python 3.9+. Dependencies: httpx and websockets.
import posthook
client = posthook.Posthook("pk_...")
# Schedule a webhook 5 minutes from now
hook = client.hooks.schedule(
path="/webhooks/user-created",
post_in="5m",
data={"userId": "123", "event": "user.created"},
)
print(hook.id) # UUID
print(hook.status) # "pending"Your Posthook project has a domain configured in the dashboard (e.g., webhook.example.com). When you schedule a hook, you specify a path (e.g., /webhooks/user-created). At the scheduled time, Posthook delivers the hook by POSTing to the full URL (https://webhook.example.com/webhooks/user-created) with your data payload and signature headers.
You can find your API key under Project Settings in the Posthook dashboard. Pass it directly to the constructor, or set the POSTHOOK_API_KEY environment variable:
# Explicit API key
client = posthook.Posthook("pk_...")
# From environment variable
client = posthook.Posthook() # reads POSTHOOK_API_KEYFor webhook signature verification, also provide a signing key:
client = posthook.Posthook("pk_...", signing_key="ph_sk_...")The signing key can also be set via the POSTHOOK_SIGNING_KEY environment variable.
Three mutually exclusive scheduling modes are available. You must provide exactly one of post_in, post_at, or post_at_local.
Schedule after a relative delay. Accepts s (seconds), m (minutes), h (hours), or d (days):
hook = client.hooks.schedule(
path="/webhooks/send-reminder",
post_in="30m",
data={"userId": "123"},
)Schedule at an exact UTC time. Accepts datetime objects or RFC 3339 strings:
from datetime import datetime, timedelta, timezone
# Using a datetime object (automatically converted to UTC)
hook = client.hooks.schedule(
path="/webhooks/send-reminder",
post_at=datetime.now(timezone.utc) + timedelta(hours=1),
data={"userId": "123"},
)
# Using an RFC 3339 string
hook = client.hooks.schedule(
path="/webhooks/send-reminder",
post_at="2026-06-15T10:00:00Z",
data={"userId": "123"},
)Schedule at a local time. Posthook handles DST transitions automatically:
hook = client.hooks.schedule(
path="/webhooks/daily-digest",
post_at_local="2026-03-01T09:00:00",
timezone="America/New_York",
data={"userId": "123"},
)Override your project's default retry behavior for a specific hook:
hook = client.hooks.schedule(
path="/webhooks/critical",
post_in="1m",
data={"orderId": "456"},
retry_override=posthook.HookRetryOverride(
min_retries=10,
delay_secs=15,
strategy="exponential",
backoff_factor=2.0,
max_delay_secs=3600,
jitter=True,
),
)hook = client.hooks.get("hook-uuid")hooks = client.hooks.list(status=posthook.STATUS_FAILED, limit=50)
print(f"Found {len(hooks)} hooks")All list parameters are optional:
| Parameter | Description |
|---|---|
status |
Filter by status: "pending", "retry", "completed", "failed" |
limit |
Max results per page |
sort_by |
Sort field (e.g., "createdAt", "postAt") |
sort_order |
"ASC" or "DESC" |
post_at_before |
Filter hooks scheduled before this time (ISO string) |
post_at_after |
Cursor: hooks scheduled after this time (ISO string) |
created_at_before |
Filter hooks created before this time (ISO string) |
created_at_after |
Filter hooks created after this time (ISO string) |
Use post_at_after as a cursor. After each page, advance it to the last hook's post_at:
limit = 100
cursor = None
while True:
hooks = client.hooks.list(status="failed", limit=limit, post_at_after=cursor)
for hook in hooks:
print(hook.id, hook.failure_error)
if len(hooks) < limit:
break # last page
cursor = hooks[-1].post_at.isoformat()For convenience, list_all yields every matching hook across all pages automatically:
for hook in client.hooks.list_all(status="failed"):
process(hook)The async client returns an async iterator:
async for hook in client.hooks.list_all(status="failed"):
await process(hook)To cancel a pending hook, delete it before delivery. Idempotent -- returns None on both 200 (deleted) and 404 (already deleted):
client.hooks.delete("hook-uuid")Three bulk operations are available, each supporting by-IDs or by-filter:
- Retry -- Re-attempts delivery for failed hooks
- Replay -- Re-delivers completed hooks (useful for reprocessing)
- Cancel -- Cancels pending hooks before delivery
result = client.hooks.bulk.retry(["id-1", "id-2", "id-3"])
print(f"Retried {result.affected} hooks")result = client.hooks.bulk.cancel_by_filter(
start_time="2026-02-01T00:00:00Z",
end_time="2026-02-22T00:00:00Z",
limit=500,
endpoint_key="/webhooks/deprecated",
)
print(f"Cancelled {result.affected} hooks")All six methods:
# By IDs
client.hooks.bulk.retry(hook_ids)
client.hooks.bulk.replay(hook_ids)
client.hooks.bulk.cancel(hook_ids)
# By filter
client.hooks.bulk.retry_by_filter(start_time, end_time, limit, ...)
client.hooks.bulk.replay_by_filter(start_time, end_time, limit, ...)
client.hooks.bulk.cancel_by_filter(start_time, end_time, limit, ...)Filter methods also accept optional endpoint_key and sequence_id keyword arguments.
When Posthook delivers a hook to your endpoint, it includes signature headers for verification. Use parse_delivery to verify and parse the delivery.
Important: You must pass the raw request body (bytes or string), not a parsed JSON object.
from flask import Flask, request
import posthook
app = Flask(__name__)
client = posthook.Posthook("pk_...", signing_key="ph_sk_...")
@app.route("/webhooks/user-created", methods=["POST"])
def handle_webhook():
try:
delivery = client.signatures.parse_delivery(
body=request.get_data(),
headers=dict(request.headers),
)
except posthook.SignatureVerificationError:
return "invalid signature", 401
print(delivery.hook_id) # from Posthook-Id header
print(delivery.path) # "/webhooks/user-created"
print(delivery.data) # your custom data payload
print(delivery.post_at) # when it was scheduled
print(delivery.posted_at) # when it was delivered
return "", 200from django.http import HttpResponse
import posthook
client = posthook.Posthook("pk_...", signing_key="ph_sk_...")
def handle_webhook(request):
try:
delivery = client.signatures.parse_delivery(
body=request.body,
headers=dict(request.headers),
)
except posthook.SignatureVerificationError:
return HttpResponse(status=401)
print(delivery.hook_id)
print(delivery.data)
return HttpResponse(status=200)from fastapi import FastAPI, Request, Response
import posthook
app = FastAPI()
client = posthook.Posthook("pk_...", signing_key="ph_sk_...")
@app.post("/webhooks/user-created")
async def handle_webhook(request: Request):
body = await request.body()
try:
delivery = client.signatures.parse_delivery(
body=body,
headers=dict(request.headers),
)
except posthook.SignatureVerificationError:
return Response(status_code=401)
print(delivery.hook_id)
print(delivery.data)
return Response(status_code=200)By default, signatures older than 5 minutes are rejected. You can override this:
delivery = client.signatures.parse_delivery(
body=raw_body,
headers=headers,
tolerance=600, # 10 minutes, in seconds
)The WebSocket listener receives hooks in real time without running an HTTP server. Use AsyncPosthook and call hooks.listen() with an async handler:
import asyncio
import posthook
async def main():
async with posthook.AsyncPosthook("pk_...") as client:
async def on_hook(delivery):
print(f"Received: {delivery.hook_id} -> {delivery.path}")
print(f"Data: {delivery.data}")
return posthook.Result.ack()
listener = await client.hooks.listen(
on_hook,
on_connected=lambda info: print(f"Connected: {info.project_name}"),
)
await listener.wait() # Blocks until closed
asyncio.run(main())Your handler must return a Result:
| Factory | Effect |
|---|---|
Result.ack() |
Processing complete — hook is marked as delivered immediately |
Result.nack(error?) |
Reject — triggers retry according to project settings |
Result.accept(timeout) |
Async — you have timeout seconds to call back via HTTP (see below) |
return posthook.Result.ack()
return posthook.Result.nack("processing failed")
return posthook.Result.accept(timeout=120)Use accept when your handler needs more time than the 10-second ack window.
After returning accept, POST to the callback URLs on the delivery to report
the outcome:
async def on_hook(delivery):
# Kick off background work, save the callback URLs
await queue.enqueue("process", {
"data": delivery.data,
"ack_url": delivery.ack_url,
"nack_url": delivery.nack_url,
})
return posthook.Result.accept(timeout=300) # 5 minutes to call back
# Later, in the background worker:
await posthook.async_ack(job["ack_url"])
# or on failure:
await posthook.async_nack(job["nack_url"], {"error": "failed"})If neither URL is called before the deadline, the hook is retried.
By default, handlers run with unlimited concurrency (matching HTTP delivery behavior). Set max_concurrency to limit parallel handlers — deliveries that arrive while at capacity are nacked immediately so the server can retry them:
listener = await client.hooks.listen(on_hook, max_concurrency=10)listener = await client.hooks.listen(
on_hook,
on_connected=lambda info: print(f"Connected: {info.connection_id}"),
on_disconnected=lambda err: print(f"Disconnected: {err}"),
on_reconnecting=lambda attempt: print(f"Reconnecting (attempt {attempt})"),
)For manual control over ack/nack, use hooks.stream() which returns an async iterator:
async with posthook.AsyncPosthook("pk_...") as client:
async with await client.hooks.stream() as stream:
async for delivery in stream:
print(delivery.hook_id, delivery.data)
if should_process(delivery):
await stream.ack(delivery.hook_id)
else:
await stream.nack(delivery.hook_id, "not ready")If your project has a domain configured, hooks are delivered via HTTP when no
WebSocket listener is connected. You can run both an HTTP endpoint and a
WebSocket listener — the server uses WebSocket when available and falls back to
HTTP automatically. Since both paths use the same Result type, you can share
your handler logic:
async def process_hook(delivery):
await process_order(delivery.data)
return posthook.Result.ack()
# HTTP delivery (ASGI endpoint)
app = signatures.asgi_handler(process_hook)
# WebSocket delivery (runs alongside)
listener = await client.hooks.listen(process_hook)Deliveries received via WebSocket include a ws field with attempt info:
async def on_hook(delivery):
if delivery.ws:
print(f"Attempt {delivery.ws.attempt}/{delivery.ws.max_attempts}")
if delivery.ws.forward_request:
print(f"Original body: {delivery.ws.forward_request.body}")
return posthook.Result.ack()For quick integration without a full web framework, SignaturesService provides handler wrappers:
import posthook
signatures = posthook.create_signatures("ph_sk_...")
async def on_hook(delivery):
print(delivery.data)
return posthook.Result.ack()
# Mount as an ASGI endpoint (e.g. with uvicorn)
app = signatures.asgi_handler(on_hook)import posthook
signatures = posthook.create_signatures("ph_sk_...")
def on_hook(delivery):
print(delivery.data)
return posthook.Result.ack()
# Mount as a WSGI endpoint (e.g. with gunicorn)
app = signatures.wsgi_handler(on_hook)When async hooks are enabled, parse_delivery() populates ack_url and nack_url on the delivery object. Return 202 from your handler and call back when processing completes.
from fastapi import FastAPI, Request, BackgroundTasks, HTTPException
from fastapi.responses import Response
import posthook
app = FastAPI()
client = posthook.Posthook("pk_...", signing_key="ph_sk_...")
async def process_and_ack(delivery):
try:
await process_video(delivery.data["video_id"])
result = await posthook.async_ack(delivery.ack_url)
print(f"Applied: {result.applied}")
except Exception as e:
await posthook.async_nack(delivery.nack_url, {"error": str(e)})
@app.post("/webhooks/process-video")
async def handle_webhook(request: Request, background_tasks: BackgroundTasks):
body = await request.body()
try:
delivery = client.signatures.parse_delivery(body=body, headers=dict(request.headers))
except posthook.SignatureVerificationError:
raise HTTPException(status_code=401)
background_tasks.add_task(process_and_ack, delivery)
return Response(status_code=202)The SDK provides standalone callback functions -- pass the URL from the delivery object:
# Sync (Flask, Django, background workers)
result = posthook.ack(delivery.ack_url)
result = posthook.nack(delivery.nack_url, {"error": "processing failed"})
# Async (FastAPI, etc.)
result = await posthook.async_ack(delivery.ack_url)
result = await posthook.async_nack(delivery.nack_url, {"error": "processing failed"})Both return a CallbackResult:
result = posthook.ack(delivery.ack_url)
print(result.applied) # True if state changed, False if already resolved
print(result.status) # "completed", "not_found", "conflict", etc.ack() and nack() return normally for 200, 404, and 409 responses. They raise CallbackError for 401 (invalid token) and 410 (expired).
If processing happens in a separate worker, use the raw callback URLs instead:
queue.enqueue("transcode", {
"video_id": delivery.data["video_id"],
"ack_url": delivery.ack_url,
"nack_url": delivery.nack_url,
})All API errors extend PosthookError and can be caught with isinstance or except:
import posthook
try:
hook = client.hooks.get("hook-id")
except posthook.RateLimitError:
print("Rate limited, retry later")
except posthook.AuthenticationError:
print("Invalid API key")
except posthook.NotFoundError:
print("Hook not found")
except posthook.PosthookError as err:
print(f"API error: {err.message} (status={err.status_code})")| Error class | HTTP Status | Code |
|---|---|---|
BadRequestError |
400 | bad_request |
AuthenticationError |
401 | authentication_error |
ForbiddenError |
403 | forbidden |
NotFoundError |
404 | not_found |
PayloadTooLargeError |
413 | payload_too_large |
RateLimitError |
429 | rate_limit_exceeded |
InternalServerError |
5xx | internal_error |
PosthookConnectionError |
-- | connection_error |
SignatureVerificationError |
-- | signature_verification_error |
client = posthook.Posthook(
"pk_...",
base_url="https://api.staging.posthook.io",
timeout=60,
signing_key="ph_sk_...",
)| Option | Description | Default |
|---|---|---|
api_key |
Your Posthook API key | POSTHOOK_API_KEY env var |
base_url |
Custom API base URL | https://api.posthook.io |
timeout |
Request timeout in seconds | 30 |
signing_key |
Signing key for webhook verification | POSTHOOK_SIGNING_KEY env var |
http_client |
Custom httpx.Client instance |
-- |
After scheduling a hook, quota information is available on the returned Hook object:
hook = client.hooks.schedule(path="/test", post_in="5m")
if hook.quota:
print(f"Limit: {hook.quota.limit}")
print(f"Usage: {hook.quota.usage}")
print(f"Remaining: {hook.quota.remaining}")
print(f"Resets at: {hook.quota.resets_at}")The AsyncPosthook client provides an identical API -- just await each call:
import posthook
async with posthook.AsyncPosthook("pk_...") as client:
hook = await client.hooks.schedule(path="/test", post_in="5m")
print(hook.id)
hooks = await client.hooks.list(status="pending")Both the sync and async clients support context managers for automatic cleanup:
# Sync
with posthook.Posthook("pk_...") as client:
hook = client.hooks.schedule(path="/test", post_in="5m")
# Async
async with posthook.AsyncPosthook("pk_...") as client:
hook = await client.hooks.schedule(path="/test", post_in="5m")You can also call close() / await close() manually if you prefer.
The SDK logs all requests via Python's logging module under the "posthook" logger. Enable it to see request details:
import logging
logging.basicConfig(level=logging.DEBUG)Example output:
DEBUG:posthook:POST /v1/hooks -> 200 (0.153s)
DEBUG:posthook:GET /v1/hooks -> 200 (0.089s)
Pass a custom httpx.Client configured with a proxy:
import httpx
import posthook
http_client = httpx.Client(proxy="http://proxy.example.com:8080")
client = posthook.Posthook("pk_...", http_client=http_client)import httpx
import posthook
http_client = httpx.Client(verify="/path/to/custom-ca-bundle.crt")
client = posthook.Posthook("pk_...", http_client=http_client)For full control over HTTP behavior, provide your own httpx.Client (sync) or httpx.AsyncClient (async). The SDK will add its authentication headers automatically:
import httpx
import posthook
http_client = httpx.Client(
timeout=60,
verify=True,
proxy="http://proxy.example.com:8080",
limits=httpx.Limits(max_connections=20),
)
client = posthook.Posthook("pk_...", http_client=http_client)When you provide a custom client, the SDK does not close it on client.close() -- you are responsible for its lifecycle.
- Documentation — guides, concepts, and patterns
- API Reference — endpoint specs and examples
- Quickstart — get started in under 2 minutes
- Pricing — free tier included
- Status — uptime and incident history
- Python 3.9+
- httpx >= 0.25.0
- websockets >= 12.0 (for WebSocket listener/stream)