-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgrpc_client.py
More file actions
148 lines (120 loc) · 4.35 KB
/
Copy pathgrpc_client.py
File metadata and controls
148 lines (120 loc) · 4.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""gRPC client example — async, with Subscribe stream for GRANT events.
Uses the native gRPC wire (not the HTTP/JSON facade). Generates Python stubs
from the proto, then drives a mutex-style contention scenario: two workers
acquire the same path; the second is enqueued (QUEUED) and waits for a GRANT
event on its Subscribe stream.
Setup (one-time):
pip install grpcio grpcio-tools
python -m grpc_tools.protoc -I proto \\
--python_out=examples/python/generated \\
--grpc_python_out=examples/python/generated \\
proto/pathlockd.proto
Run: PYTHONPATH=examples/python/generated python3 examples/python/grpc_client.py
Prereq: pathlockd running (default grpc://localhost:50051).
"""
import asyncio
import os
import sys
generated = os.path.join(os.path.dirname(__file__), "generated")
if generated not in sys.path:
sys.path.insert(0, generated)
import grpc.aio
from pathlockd_pb2 import (
AcquireRequest,
LockRequest,
ReleaseRequest,
ReleaseLocksRequest,
ReleaseAllRequest,
RenewRequest,
AssertFencingRequest,
SubscribeRequest,
)
from pathlockd_pb2_grpc import PathLockStub
GRPC_ADDR = os.getenv("PATHLOCKD_GRPC_ADDR", "localhost:50051")
PATH = "mutex:/grpc-critical"
TTL_MS = 30_000
async def wait_for_grant(stub, owner_id):
sub = stub.Subscribe(SubscribeRequest(owner_id=owner_id))
async for event in sub:
if event.type == 3:
return
if event.type == 1:
raise RuntimeError(f"{owner_id} was KILLED while waiting")
raise RuntimeError("Subscribe stream closed without GRANT")
async def worker(stub, owner, start_delay):
await asyncio.sleep(start_delay)
print(f"[{owner}] acquiring {PATH} ...")
resp = await stub.Acquire(AcquireRequest(
owner_id=owner,
ttl_ms=TTL_MS,
requests=[LockRequest(path=PATH, mode=0)],
queue_ttl_ms=60_000,
))
if resp.status == 3:
blocker = resp.owner
reason = resp.reason
print(f"[{owner}] QUEUED behind {blocker} (reason={reason}); "
f"waiting for GRANT on Subscribe stream ...")
await wait_for_grant(stub, owner)
print(f"[{owner}] GRANT received — re-issuing acquire ...")
resp = await stub.Acquire(AcquireRequest(
owner_id=owner,
ttl_ms=TTL_MS,
requests=[LockRequest(path=PATH, mode=0)],
))
if resp.status != 0:
raise RuntimeError(f"[{owner}] acquire failed: {resp}")
fence = resp.fencing_token
ns = resp.namespace
print(f"[{owner}] holding lock (fence={fence}, ns={ns})")
renew_task = asyncio.create_task(renew_loop(stub, owner, ns))
await asyncio.sleep(2)
print(f"[{owner}] asserting fencing before backing-store write ...")
assert_resp = await stub.AssertFencing(AssertFencingRequest(
owner_id=owner,
fencing_token=fence,
paths=[PATH],
))
if assert_resp.status != 0:
raise RuntimeError(f"[{owner}] fencing failed: {assert_resp}")
print(f"[{owner}] fence OK — proceeding with write")
renew_task.cancel()
try:
await renew_task
except asyncio.CancelledError:
pass
print(f"[{owner}] releasing ...")
await stub.Release(ReleaseLocksRequest(
owner_id=owner,
requests=[ReleaseRequest(path=PATH, mode=0)],
))
print(f"[{owner}] done.")
async def renew_loop(stub, owner, namespace):
interval = TTL_MS / 1000 / 3
while True:
await asyncio.sleep(interval)
domains = [namespace] if namespace else []
resp = await stub.Renew(RenewRequest(
owner_id=owner,
ttl_ms=TTL_MS,
domains=domains,
))
if resp.status != 0:
print(f"[{owner}] renew LOST: {resp}")
return
print(f"[{owner}] renewed")
async def main():
async with grpc.aio.insecure_channel(GRPC_ADDR) as channel:
stub = PathLockStub(channel)
await asyncio.sleep(0.2)
await stub.ReleaseAll(ReleaseAllRequest(
owner_id="grpc-worker-A", del_wait_key=True))
await stub.ReleaseAll(ReleaseAllRequest(
owner_id="grpc-worker-B", del_wait_key=True))
await asyncio.gather(
worker(stub, "grpc-worker-A", 0),
worker(stub, "grpc-worker-B", 1),
)
print("gRPC demo complete.")
if __name__ == "__main__":
asyncio.run(main())