Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions tests/cvcuda/python/test_multi_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_multiple_streams():
stream3 = cvcuda.Stream() # create a new stream
assert stream1 is not stream2
assert stream1 is not stream3
assert stream2 is not stream3
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a != b and a != c does not imply that b != c.

assert cvcuda.Stream.current is cvcuda.Stream.default
assert cvcuda.Stream.current is not stream1
assert cvcuda.Stream.current is not stream2
Expand Down Expand Up @@ -54,16 +55,18 @@ def test_stream_context_nested():
assert cvcuda.Stream.current is stream2
assert cvcuda.Stream.current is cvcuda.Stream.default

class _CatchThisException(Exception):
"""A test specific Exception to check that we raise the correct Exception."""

def test_stream_context_exception():
stream1 = cvcuda.Stream() # create a new stream
stream2 = cvcuda.Stream() # create a new stream
with t.raises(Exception):
with t.raises(_CatchThisException):
with stream1:
assert cvcuda.Stream.current is stream1
with stream2:
assert cvcuda.Stream.current is stream2
raise Exception()
raise _CatchThisException()
assert cvcuda.Stream.current is stream1
assert cvcuda.Stream.current is cvcuda.Stream.default
Comment on lines -61 to 71
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that the purpose of this test to to check that the Exception that we raised is the one that is raised. Since Exception is the base class for all exceptions, I believe that we need to make a special exception. Otherwise, catching any Exception will cause the test to pass.

with stream2:
Expand All @@ -77,6 +80,7 @@ def test_operator_stream():
stream3 = cvcuda.Stream() # create a new stream
assert stream1 is not stream2
assert stream1 is not stream3
assert stream2 is not stream3
assert cvcuda.Stream.current is cvcuda.Stream.default
assert cvcuda.Stream.current is not stream1
assert cvcuda.Stream.current is not stream2
Expand All @@ -100,6 +104,9 @@ def test_operator_stream():
cvcuda.cvtcolor(img, cvcuda.ColorConversion.BGR2GRAY)
assert cvcuda.Stream.current is stream3
assert cvcuda.Stream.current is cvcuda.Stream.default
stream1.sync()
stream2.sync()
stream3.sync()
Comment on lines +107 to +109
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want to wait for all the queued work to complete before the test returns.



def test_operator_changing_stream():
Expand All @@ -110,6 +117,7 @@ def test_operator_changing_stream():
C = 3
Loop = 50
streams = [cvcuda.Stream() for _ in range(4)] # create a list of streams
torch_streams = [torch.cuda.ExternalStream(s.handle) for s in streams]

inputTensor = torch.randint(0, 256, (N, H, W, C), dtype=torch.uint8).cuda()
outputTensor = torch.randint(0, 256, (N, H, W, C), dtype=torch.uint8).cuda()
Expand All @@ -119,11 +127,16 @@ def test_operator_changing_stream():
inTensor = cvcuda.as_tensor(inputTensor.data, "NHWC")
outTensor = cvcuda.as_tensor(outputTensor.data, "NHWC")

prev_torch_stream = None
for _ in range(Loop):
for stream in streams:
for stream, torch_stream in zip(streams, torch_streams, strict=True):
if prev_torch_stream is not None:
torch_stream.wait_stream(prev_torch_stream)
cvcuda.flip_into(outTensor, inTensor, -1, stream=stream) # output x flipped
cvcuda.flip_into(inTensor, outTensor, -1, stream=stream) # output y flipped
prev_torch_stream = torch_stream
Comment on lines +130 to +137
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same buffer is shared by multiple streams. If you don't synchronize the streams using events or wait, the behavior is undefined because all of the streams will modify the data simultaneously.


torch_streams[-1].synchronize()
final_out = torch.as_tensor(inTensor.cuda()).cpu()
assert torch.equal(final_out, inputTensor_copy.cpu())

Expand All @@ -138,6 +151,9 @@ def test_operator_changing_stream_loaded():
stream1 = cvcuda.Stream()
stream2 = cvcuda.Stream()

torch_stream1 = torch.cuda.ExternalStream(stream1.handle)
torch_stream2 = torch.cuda.ExternalStream(stream2.handle)

inputTensor = torch.randint(0, 256, (N, H, W, C), dtype=torch.uint8).cuda()
inputTensorTmp = torch.randint(0, 256, (N, H, W, C), dtype=torch.uint8).cuda()
outputTensor = torch.randint(0, 256, (N, H, W, C), dtype=torch.uint8).cuda()
Expand All @@ -156,9 +172,11 @@ def test_operator_changing_stream_loaded():
cvcuda.flip_into(
inTensorTmp, inTensor, -1, stream=stream1
) # output x/y flipped
torch_stream2.wait_stream(torch_stream1)
cvcuda.flip_into(
outTensor, inTensorTmp, -1, stream=stream2
) # output y/y flipped

torch_stream2.synchronize()
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that torch has an implicit wait when converting from cuda to cpu tensor, so we need to have the host wait for the stream to complete before getting the result.

final_out = torch.as_tensor(outTensor.cuda()).cpu()
assert torch.equal(final_out, inputTensor_copy.cpu())