Cancellation and Deadline

Cancellation and deadline are mechanisms that help manage the lifecycle of Remote Procedure Calls.

They are especially useful in distributed systems, where you may encounter network delays, failures, or operations that take a long time to complete, but also in cases of operations that deal with big data or use limited resources.

Cancellation

Cancellation allows either the client (most commonly) or the server to explicitly stop the RPC before it finishes. On the client side, cancellation is usually triggered by user actions, such as closing a user interface, or by internal application logic.

Server-side cancellation occurs when the server chooses to stop processing a request. This can happen for reasons such as server overload, system failures, or invalid input.

The cancellation is communicated via the context, allowing both the client and server to monitor and respond to it.

The cancellation request is sent to the server, which then decides whether to accept it and what actions to take upon receiving it. If the server processes the cancellation, the client receives the status code CANCELLED (1). Due to timing considerations, this status is only sent if the RPC is explicitly cancelled before it completes. This means it is possible for the client to send a cancellation request but still receive a success response for the original request.

Examples - Sample code for cancellation (client-side)

The examples simulate a cancellation after a timeout of 1 second while the process is performing something else (simulated by a delay).

Note:
Cancellation only makes sense in asynchronous scenarios, so an asynchronous API is used in the stub.
  public void callSomeFunctionWithCancellation(shared_ptr<VrpcStub> stub, shared_ptr<VrpcClientContext> clientContext, anytype request)
  {
    // Create a call context from client context
    shared_ptr<VrpcClientContext> callContext = clientContext.clone();
    
    shared_ptr<callSomeFunctionWitCancellationCb> cb = new callSomeFunctionWitCancellationCb();

    // Call async RPC with call context and callback
    stub.callFunctionAsync("SomeFunction", request, cb, callSomeFunctionWitCancellationCb::onResult, callContext);

    // Do something else while waiting for response
    delay(1, 0); // Simulated with 1 second delay

    // Cancel the request (if not yet finished)
    callContext.tryCancel();

    // Wait for result
    VrpcStatus status = cb.waitForResult();

    if (!status.isOk())
    {
      if (status.getStatusCode() == VrpcStatusCode::Canceled)  // Verify if call was canceled
        DebugN("Request was canceled");
      else
        DebugN("Received Error Status: " + status.getStatusCode() + ", " + status.getText());
    }
  }

Required helper class for:

class callSomeFunctionWitCancellationCb
{
  private bool m_hasResult = false;
  private VrpcStatus m_result = VRPC_STATUS_UNKNOWN;
  private string SEM = "1_" + (string)this;

  public void onResult(VrpcStatus result)
  {
    m_result = result;
    m_hasResult = true;
    semRelease(SEM);
  }

  public VrpcStatus waitForResult()
  {
    semAcquire(SEM);
    return m_result;
  }
};
void callSomeFunctionWithCancellation(std::unique_ptr<::vrpc::Stub> stub, std::shared_ptr<::vrpc::ClientContext> clientContext, const Variable& request)
  {
    // Create a call context from client context
    std::shared_ptr<::vrpc::ClientContext>callContext = clientContext->clone();

    std::shared_ptr<DefaultWaitForResponse> wait = std::make_shared<DefaultWaitForResponse>();

    // Call RPC with call context
    stub->callFunctionWait("SomeFunction", request, wait, callContext);

    // Do something else while waiting for response
    // Simulated with 1 second delay
    for (int i = 0; i <= 10; i++)
    {
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
      Manager::dispatch();
    }

    // Cancel the request (if not yet finished)
    callContext->tryCancel();

    // Wait for result
    while (!wait->isFinished()) { Manager::dispatch(0.05); }
    const ::vrpc::ResponseData& responseData = wait->getResponseData();

    ::vrpc::Status status = responseData.getStatus();
    if (!status.isOk())
    {
      if (status.getStatusCode() == ::vrpc::StatusCode::Canceled)  // Verify if call was canceled
        ErrHdl::error(ErrClass::PRIO_SEVERE, ErrClass::ERR_IMPL, ErrClass::UNEXPECTEDSTATE, "Request was canceled");
      else
        ErrHdl::error(ErrClass::PRIO_SEVERE, ErrClass::ERR_IMPL, ErrClass::UNEXPECTEDSTATE, "Received Error: ", status.toString());
    }
  }              
  public async Task CallSomeFunctionWithCancellation(VrpcStub stub, OaVariant request)
  {
    // Create cancellation token source
    CancellationTokenSource cts = new();

    try
    {
      // Call RPC with cancellation token
      var call = stub.CallFunctionAsync("SomeFunction", request, cancellationToken: cts.Token);

      // Cancel the request after 1 second
      cts.CancelAfter(TimeSpan.FromSeconds(1));

      // Do something else while waiting for response
      await Task.Delay(TimeSpan.FromSeconds(2)); // Simulated with 2 seconds delay

      // Wait for result
      await call;
    }
    catch (RpcException ex)
    {
      if (ex.StatusCode == StatusCode.Canceled)  // Verify if call was canceled
        _logger.LogError("Request was canceled");
      else
        _logger.LogError($"Received Error: {ex}");
    }
  }              
  public async callSomeFunctionWithCancellation(
    stub: Vrpc.Stub,
    clientContext: Vrpc.ClientContext,
    request: Vrpc.Variant,
  ): Promise<void> {
    // Create a call context from client context
    const callContext: Vrpc.ClientContext = clientContext.clone();

    // Cancel the request after 1 second
    callContext.cancelSignal = AbortSignal.timeout(1000);

    try {
      // Call RPC with call context
      const call = stub.callFunction('SomeFunction', request, callContext);

      // Do something else while waiting for response
      await new Promise((resolve) => setTimeout(resolve, 2000)); // Simulated with 2 second delay

      // Wait for result
      await call;
    } catch (error) {
      if (error instanceof Vrpc.Error) {
        if (error.status.statusCode == Vrpc.StatusCode.Canceled)
          // Verify if call was canceled
          console.error('Request was canceled');
        console.error(
          `Received Error: ${error.status.statusCode}, ${error.status.text}`,
        );
      }
    }
  }

Examples - Sample code for cancellation (server-side)

  public anytype someServiceMethodImplWithCancellation(VrpcServerContext &serverContext, anytype request)
  {
    // Check if the request has already been canceled at the start
    if (serverContext.isCanceled())
      vrpcThrow(VrpcStatusCode::Canceled, "Request was canceled");

    //...

    // break long running operations when canceled, e.g.
    do
    {
      //...
    }
    while (!serverContext.isCanceled());

    //...

    // Periodically check for cancellation if doing long processing
    if (serverContext.isCanceled())
      vrpcThrow(VrpcStatusCode::Canceled, "Request was canceled");

    return "MyResult";
  }
  ::vrpc::Status someServiceMethodImplWithCancellation(std::shared_ptr<::vrpc::ServerContext> serverContext, const Variable* request, Variable*& response)
  {
    // Check if the request has already been canceled at the start
    if (serverContext->isCanceled())
      return ::vrpc::Status(::vrpc::StatusCode::Canceled, "Request was canceled");

    //...

    // break long running operations when canceled, e.g.
    do
    {
      
    }
    while (!serverContext->isCanceled());

    //...

    // May use callback for cancellation
    size_t cancelCbCookie = serverContext->registerOnCanceledCb([](){ /* ... */ });
    serverContext->unregisterOnCanceledCb(cancelCbCookie);

    //...

    // Periodically check for cancellation if doing long processing
    if (serverContext->isCanceled())
      return ::vrpc::Status(::vrpc::StatusCode::Canceled, "Request was canceled");


    response = new TextVar("MyResult");
    return vrpc::Status::OK;
  }               
  public async Task<OaVariant> SomeServiceMethodImplWithCancellation(OaVariant request, ServerCallContext serverContext)
  {
    CancellationToken cancellationToken = serverContext.CancellationToken;

    // Check if the request has already been canceled at the start
    serverContext.CancellationToken.ThrowIfCancellationRequested();

    //...

    // Use token in async calls
    await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);

    //...

    // Periodically check for cancellation if doing long processing
    cancellationToken.ThrowIfCancellationRequested();

    return new OaVariant("MyResult");
  }              
  private async someServiceMethodImplWithCancellation(
    serverContext: Vrpc.ServerContext,
    request: Vrpc.Variant,
  ): Promise<Vrpc.Variant> {
    const cancelSignal = serverContext.cancelSignal;

    // Check if the request has already been canceled at the start
    cancelSignal.throwIfAborted();

    //...

    // Use signal in async calls
    await new Promise((resolve, reject) => {
      const timeout = setTimeout(() => resolve, 1000);

      // Listen for the abort signal
      serverContext.cancelSignal.addEventListener('abort', () => {
        clearTimeout(timeout); // Clear the timeout to cancel the delay
        reject(new Error('Operation canceled')); // Reject the promise
      });
    });

    //...

    // Periodically check for cancellation if doing long processing
    cancelSignal.throwIfAborted();

    return Vrpc.Variant.createString('MyResult');
  }

Deadline

A deadline is a time limit set by the client that specifies how long it is willing to wait for the server to complete the RPC. If the server does not respond within this time frame, the call is automatically terminated. When the deadline is exceeded, the status code DEADLINE_EXCEEDED (4) is returned to the client. This status is sent to the client whether the server was still processing the request or had already stopped because the deadline was reached. The deadline is also considered on the server. When the deadline is exceeded, the call on the server will be cancelled.
Note:
As the manager execution, including service and client implementation and MSA messaging, is internally based on some timing aspect, it could be that the cancellation on the server, including sending the cancel response, is faster than the deadline handling in the client. This means the client code receives the cancel error from the service before the deadline error of the client.

Examples

   public void callSomeFunctionWithDeadline(shared_ptr<VrpcStub> stub, shared_ptr<VrpcClientContext> clientContext, anytype request)
  {
    // Create a call context from client context
    shared_ptr<VrpcClientContext> callContext = clientContext.clone();

    // Set deadline
    time deadline = getCurrentTime() + 2.0; // 2 seconds
    context.setDeadline(deadline);

    try
    {
      // Call RPC with call context
      stub.callFunction("SomeFunction", request, callContext);
    }
    catch
    {
      VrpcException ex = vrpcGetLastException();
      if (ex.getStatus().getStatusCode() == VrpcStatusCode::DeadlineExceeded)  // Verify if deadline expired
        DebugN("Request timed out");
      else
        DebugN("Received Error: " + ex.toString());
    }
  }
  void callSomeFunctionWithDeadline(std::unique_ptr<::vrpc::Stub> stub, std::shared_ptr<::vrpc::ClientContext> clientContext, const Variable& request)
  {
    // Create a call context from client context
    std::shared_ptr<::vrpc::ClientContext>callContext = clientContext->clone();

    // Set deadline
    PVSSTime deadline = PVSSTime::getSystemTime() + PVSSTime(2, 0);  // 2 seconds
    callContext->setDeadline(deadline);

    std::shared_ptr<DefaultWaitForResponse> wait = std::make_shared<DefaultWaitForResponse>();

    // Call RPC with call context
    stub->callFunctionWait("SomeFunction", request, wait, callContext);

    while (!wait->isFinished()) { Manager::dispatch(0.05); }
    const ::vrpc::ResponseData& responseData = wait->getResponseData();

    ::vrpc::Status status = responseData.getStatus();
    if (!status.isOk())
    {
      if (status.getStatusCode() == ::vrpc::StatusCode::DeadlineExceeded)  // Verify if deadline expired
        ErrHdl::error(ErrClass::PRIO_SEVERE, ErrClass::ERR_IMPL, ErrClass::UNEXPECTEDSTATE, "Request timed out");
      else
        ErrHdl::error(ErrClass::PRIO_SEVERE, ErrClass::ERR_IMPL, ErrClass::UNEXPECTEDSTATE, "Received Error: ", status.toString());
    }
  }                 
  public async Task CallSomeFunctionWithDeadline(VrpcStub stub, OaVariant request)
  {
    // Define deadline
    DateTime deadline = DateTime.UtcNow + TimeSpan.FromSeconds(2); // 2 seconds

    try
    {
      // Call RPC with deadline
      await stub.CallFunctionAsync("SomeFunction", request, deadline: deadline);
    }
    catch (RpcException ex)
    {
      if (ex.StatusCode == StatusCode.DeadlineExceeded)  // Verify if deadline expired
        _logger.LogError("Request timed out");
      else
        _logger.LogError($"Received Error: {ex}");
    }
  }            
  public async callSomeFunctionWithDeadline(
    stub: Vrpc.Stub,
    clientContext: Vrpc.ClientContext,
    request: Vrpc.Variant,
  ): Promise<void> {
    // Create a call context from client context
    const callContext: Vrpc.ClientContext = clientContext.clone();

    // Set deadline
    const deadline: WinccoaTime = Date.now() + 2000; // 2 seconds
    callContext.deadline = deadline;

    try {
      // Call RPC with call context
      await stub.callFunction('SomeFunction', request, callContext);
    } catch (error) {
      if (error instanceof Vrpc.Error) {
        if (error.status.statusCode == Vrpc.StatusCode.DeadlineExceeded)
          // Verify if deadline expired
          console.error('Request timed out');
        else
          console.error(`Received Error: ${error.status.statusCode}, ${error.status.text}`);
      }
    }
  }