Cancellation and Deadline
Cancellation and deadline are mechanisms that help manage the lifecycle of Remote Procedure Calls.
They are especially useful in distributed systems, where you may encounter network delays, failures, or operations that take a long time to complete, but also in cases of operations that deal with big data or use limited resources.
Cancellation
Cancellation allows either the client (most commonly) or the server to explicitly stop the RPC before it finishes. On the client side, cancellation is usually triggered by user actions, such as closing a user interface, or by internal application logic.
Server-side cancellation occurs when the server chooses to stop processing a request. This can happen for reasons such as server overload, system failures, or invalid input.
The cancellation is communicated via the context, allowing both the client and server to monitor and respond to it.
The cancellation request is sent to the server, which then decides whether to accept it and
what actions to take upon receiving it. If the server processes the cancellation, the client
receives the status code CANCELLED (1). Due to timing considerations, this
status is only sent if the RPC is explicitly cancelled before it completes. This means it is
possible for the client to send a cancellation request but still receive a success response
for the original request.
Examples - Sample code for cancellation (client-side)
The examples simulate a cancellation after a timeout of 1 second while the process is performing something else (simulated by a delay).
public void callSomeFunctionWithCancellation(shared_ptr<VrpcStub> stub, shared_ptr<VrpcClientContext> clientContext, anytype request)
{
// Create a call context from client context
shared_ptr<VrpcClientContext> callContext = clientContext.clone();
shared_ptr<callSomeFunctionWitCancellationCb> cb = new callSomeFunctionWitCancellationCb();
// Call async RPC with call context and callback
stub.callFunctionAsync("SomeFunction", request, cb, callSomeFunctionWitCancellationCb::onResult, callContext);
// Do something else while waiting for response
delay(1, 0); // Simulated with 1 second delay
// Cancel the request (if not yet finished)
callContext.tryCancel();
// Wait for result
VrpcStatus status = cb.waitForResult();
if (!status.isOk())
{
if (status.getStatusCode() == VrpcStatusCode::Canceled) // Verify if call was canceled
DebugN("Request was canceled");
else
DebugN("Received Error Status: " + status.getStatusCode() + ", " + status.getText());
}
}Required helper class for:
class callSomeFunctionWitCancellationCb
{
private bool m_hasResult = false;
private VrpcStatus m_result = VRPC_STATUS_UNKNOWN;
private string SEM = "1_" + (string)this;
public void onResult(VrpcStatus result)
{
m_result = result;
m_hasResult = true;
semRelease(SEM);
}
public VrpcStatus waitForResult()
{
semAcquire(SEM);
return m_result;
}
};void callSomeFunctionWithCancellation(std::unique_ptr<::vrpc::Stub> stub, std::shared_ptr<::vrpc::ClientContext> clientContext, const Variable& request)
{
// Create a call context from client context
std::shared_ptr<::vrpc::ClientContext>callContext = clientContext->clone();
std::shared_ptr<DefaultWaitForResponse> wait = std::make_shared<DefaultWaitForResponse>();
// Call RPC with call context
stub->callFunctionWait("SomeFunction", request, wait, callContext);
// Do something else while waiting for response
// Simulated with 1 second delay
for (int i = 0; i <= 10; i++)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Manager::dispatch();
}
// Cancel the request (if not yet finished)
callContext->tryCancel();
// Wait for result
while (!wait->isFinished()) { Manager::dispatch(0.05); }
const ::vrpc::ResponseData& responseData = wait->getResponseData();
::vrpc::Status status = responseData.getStatus();
if (!status.isOk())
{
if (status.getStatusCode() == ::vrpc::StatusCode::Canceled) // Verify if call was canceled
ErrHdl::error(ErrClass::PRIO_SEVERE, ErrClass::ERR_IMPL, ErrClass::UNEXPECTEDSTATE, "Request was canceled");
else
ErrHdl::error(ErrClass::PRIO_SEVERE, ErrClass::ERR_IMPL, ErrClass::UNEXPECTEDSTATE, "Received Error: ", status.toString());
}
} public async Task CallSomeFunctionWithCancellation(VrpcStub stub, OaVariant request)
{
// Create cancellation token source
CancellationTokenSource cts = new();
try
{
// Call RPC with cancellation token
var call = stub.CallFunctionAsync("SomeFunction", request, cancellationToken: cts.Token);
// Cancel the request after 1 second
cts.CancelAfter(TimeSpan.FromSeconds(1));
// Do something else while waiting for response
await Task.Delay(TimeSpan.FromSeconds(2)); // Simulated with 2 seconds delay
// Wait for result
await call;
}
catch (RpcException ex)
{
if (ex.StatusCode == StatusCode.Canceled) // Verify if call was canceled
_logger.LogError("Request was canceled");
else
_logger.LogError($"Received Error: {ex}");
}
} public async callSomeFunctionWithCancellation(
stub: Vrpc.Stub,
clientContext: Vrpc.ClientContext,
request: Vrpc.Variant,
): Promise<void> {
// Create a call context from client context
const callContext: Vrpc.ClientContext = clientContext.clone();
// Cancel the request after 1 second
callContext.cancelSignal = AbortSignal.timeout(1000);
try {
// Call RPC with call context
const call = stub.callFunction('SomeFunction', request, callContext);
// Do something else while waiting for response
await new Promise((resolve) => setTimeout(resolve, 2000)); // Simulated with 2 second delay
// Wait for result
await call;
} catch (error) {
if (error instanceof Vrpc.Error) {
if (error.status.statusCode == Vrpc.StatusCode.Canceled)
// Verify if call was canceled
console.error('Request was canceled');
console.error(
`Received Error: ${error.status.statusCode}, ${error.status.text}`,
);
}
}
}Examples - Sample code for cancellation (server-side)
public anytype someServiceMethodImplWithCancellation(VrpcServerContext &serverContext, anytype request)
{
// Check if the request has already been canceled at the start
if (serverContext.isCanceled())
vrpcThrow(VrpcStatusCode::Canceled, "Request was canceled");
//...
// break long running operations when canceled, e.g.
do
{
//...
}
while (!serverContext.isCanceled());
//...
// Periodically check for cancellation if doing long processing
if (serverContext.isCanceled())
vrpcThrow(VrpcStatusCode::Canceled, "Request was canceled");
return "MyResult";
} ::vrpc::Status someServiceMethodImplWithCancellation(std::shared_ptr<::vrpc::ServerContext> serverContext, const Variable* request, Variable*& response)
{
// Check if the request has already been canceled at the start
if (serverContext->isCanceled())
return ::vrpc::Status(::vrpc::StatusCode::Canceled, "Request was canceled");
//...
// break long running operations when canceled, e.g.
do
{
}
while (!serverContext->isCanceled());
//...
// May use callback for cancellation
size_t cancelCbCookie = serverContext->registerOnCanceledCb([](){ /* ... */ });
serverContext->unregisterOnCanceledCb(cancelCbCookie);
//...
// Periodically check for cancellation if doing long processing
if (serverContext->isCanceled())
return ::vrpc::Status(::vrpc::StatusCode::Canceled, "Request was canceled");
response = new TextVar("MyResult");
return vrpc::Status::OK;
} public async Task<OaVariant> SomeServiceMethodImplWithCancellation(OaVariant request, ServerCallContext serverContext)
{
CancellationToken cancellationToken = serverContext.CancellationToken;
// Check if the request has already been canceled at the start
serverContext.CancellationToken.ThrowIfCancellationRequested();
//...
// Use token in async calls
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
//...
// Periodically check for cancellation if doing long processing
cancellationToken.ThrowIfCancellationRequested();
return new OaVariant("MyResult");
} private async someServiceMethodImplWithCancellation(
serverContext: Vrpc.ServerContext,
request: Vrpc.Variant,
): Promise<Vrpc.Variant> {
const cancelSignal = serverContext.cancelSignal;
// Check if the request has already been canceled at the start
cancelSignal.throwIfAborted();
//...
// Use signal in async calls
await new Promise((resolve, reject) => {
const timeout = setTimeout(() => resolve, 1000);
// Listen for the abort signal
serverContext.cancelSignal.addEventListener('abort', () => {
clearTimeout(timeout); // Clear the timeout to cancel the delay
reject(new Error('Operation canceled')); // Reject the promise
});
});
//...
// Periodically check for cancellation if doing long processing
cancelSignal.throwIfAborted();
return Vrpc.Variant.createString('MyResult');
}Deadline
DEADLINE_EXCEEDED (4) is returned to the client. This status is sent to
the client whether the server was still processing the request or had already stopped
because the deadline was reached. The deadline is also considered on the server. When the
deadline is exceeded, the call on the server will be cancelled.
Examples
public void callSomeFunctionWithDeadline(shared_ptr<VrpcStub> stub, shared_ptr<VrpcClientContext> clientContext, anytype request)
{
// Create a call context from client context
shared_ptr<VrpcClientContext> callContext = clientContext.clone();
// Set deadline
time deadline = getCurrentTime() + 2.0; // 2 seconds
context.setDeadline(deadline);
try
{
// Call RPC with call context
stub.callFunction("SomeFunction", request, callContext);
}
catch
{
VrpcException ex = vrpcGetLastException();
if (ex.getStatus().getStatusCode() == VrpcStatusCode::DeadlineExceeded) // Verify if deadline expired
DebugN("Request timed out");
else
DebugN("Received Error: " + ex.toString());
}
} void callSomeFunctionWithDeadline(std::unique_ptr<::vrpc::Stub> stub, std::shared_ptr<::vrpc::ClientContext> clientContext, const Variable& request)
{
// Create a call context from client context
std::shared_ptr<::vrpc::ClientContext>callContext = clientContext->clone();
// Set deadline
PVSSTime deadline = PVSSTime::getSystemTime() + PVSSTime(2, 0); // 2 seconds
callContext->setDeadline(deadline);
std::shared_ptr<DefaultWaitForResponse> wait = std::make_shared<DefaultWaitForResponse>();
// Call RPC with call context
stub->callFunctionWait("SomeFunction", request, wait, callContext);
while (!wait->isFinished()) { Manager::dispatch(0.05); }
const ::vrpc::ResponseData& responseData = wait->getResponseData();
::vrpc::Status status = responseData.getStatus();
if (!status.isOk())
{
if (status.getStatusCode() == ::vrpc::StatusCode::DeadlineExceeded) // Verify if deadline expired
ErrHdl::error(ErrClass::PRIO_SEVERE, ErrClass::ERR_IMPL, ErrClass::UNEXPECTEDSTATE, "Request timed out");
else
ErrHdl::error(ErrClass::PRIO_SEVERE, ErrClass::ERR_IMPL, ErrClass::UNEXPECTEDSTATE, "Received Error: ", status.toString());
}
} public async Task CallSomeFunctionWithDeadline(VrpcStub stub, OaVariant request)
{
// Define deadline
DateTime deadline = DateTime.UtcNow + TimeSpan.FromSeconds(2); // 2 seconds
try
{
// Call RPC with deadline
await stub.CallFunctionAsync("SomeFunction", request, deadline: deadline);
}
catch (RpcException ex)
{
if (ex.StatusCode == StatusCode.DeadlineExceeded) // Verify if deadline expired
_logger.LogError("Request timed out");
else
_logger.LogError($"Received Error: {ex}");
}
} public async callSomeFunctionWithDeadline(
stub: Vrpc.Stub,
clientContext: Vrpc.ClientContext,
request: Vrpc.Variant,
): Promise<void> {
// Create a call context from client context
const callContext: Vrpc.ClientContext = clientContext.clone();
// Set deadline
const deadline: WinccoaTime = Date.now() + 2000; // 2 seconds
callContext.deadline = deadline;
try {
// Call RPC with call context
await stub.callFunction('SomeFunction', request, callContext);
} catch (error) {
if (error instanceof Vrpc.Error) {
if (error.status.statusCode == Vrpc.StatusCode.DeadlineExceeded)
// Verify if deadline expired
console.error('Request timed out');
else
console.error(`Received Error: ${error.status.statusCode}, ${error.status.text}`);
}
}
}
