Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ services:
- AWS_DEFAULT_REGION=us-east-1
- AWS_ACCESS_KEY_ID=dummy
- AWS_SECRET_ACCESS_KEY=dummy
volumes:
- "./test:/usr/test"
depends_on:
localstack-persist:
condition: service_healthy
29 changes: 25 additions & 4 deletions src/localstack_persist/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from localstack.services.plugins import SERVICE_PLUGINS
from localstack.aws.api import RequestContext
from collections import defaultdict
from threading import Thread, Condition
from threading import Thread, Condition, Timer
from readerwriterlock.rwlock import RWLockWrite, Lockable
from .visitors import LoadStateVisitor, SaveStateVisitor
from .config import BASE_DIR, is_persistence_enabled, PERSIST_FREQUENCY
Expand Down Expand Up @@ -71,10 +71,16 @@ def on_request(self, chain, context: RequestContext, response):
if service_name not in self.loaded_services:
self._load_service_state(service_name)

# Prevent persistence from running for this service while handling this request
# Prevent persistence from running for this service while handling this request...
rlock = self.rwlocks[service_name].gen_rlock()
setattr(context, "localstack-persist_rlock", rlock)
rlock.acquire()
# ...unless the request takes over 1 second, in which case we force release the lock to
# prevent long-running requests from blocking persistence which would in turn block other
# requests
timer = Timer(1, try_release, [rlock])
setattr(context, "localstack-persist_rlock_timer", timer)
timer.start()

def on_response(self, chain, context: RequestContext, response):
if not context.service or not context.request or not context.operation:
Expand All @@ -94,8 +100,15 @@ def on_response(self, chain, context: RequestContext, response):
self.add_affected_service(service_name)

def on_finalize(self, chain, context: RequestContext, response):
if rlock := getattr(context, "localstack-persist_rlock", None):
cast(Lockable, rlock).release()
if rlock := cast(
Lockable | None, getattr(context, "localstack-persist_rlock", None)
):
try_release(rlock)

if timer := cast(
Timer | None, getattr(context, "localstack-persist_rlock_timer", None)
):
timer.cancel()

def load_all_services_state(self):
LOG.info("Loading persisted state of all services...")
Expand Down Expand Up @@ -182,3 +195,11 @@ def _save_service_state(self, service_name: str):


STATE_TRACKER = StateTracker()


def try_release(lock: Lockable):
if lock and lock.locked():
try:
lock.release()
except:
pass
4 changes: 2 additions & 2 deletions test/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM python

RUN pip3 install boto3==1.28.80 botocore==1.31.80
WORKDIR /usr/test

COPY . .
RUN pip3 install boto3==1.28.80 botocore==1.31.80

ENTRYPOINT [ "python", "./main.py" ]
4 changes: 2 additions & 2 deletions test/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ def wait_until_es_ready(domain_name: str):
assert_equal(queue.attributes["ApproximateNumberOfMessages"], "0")

table = dynamodb.Table("test-table")
item = table.get_item(Key={"id": 123})["Item"]
assert_equal(item["foo"], "bar")
item = table.get_item(Key={"id": 123}).get("Item", {})
assert_equal(item.get("foo"), "bar")

bucket = s3.Bucket("test-bucket")
obj = bucket.Object("test-object")
Expand Down
Loading