Skip to main content

How to subscribe to events ?

Introduction​

In some cases, you may want to react to the creation or update of data in Cardinal. Since this data is encrypted, we cannot host this code in Cardinal. That's why we provide you with the tools to implement a microservice/backend that can subscribe to these different events and react accordingly.

Example use cases​

  • Analyze a measurement taken on a patient using a machine learning model
  • Send an email to a patient when a new appointment is created
  • Update a patient's status in an external system when their health status changes
  • etc.

Setup​

You will need to host a service that can subscribe to these events using the Cardinal SDK. The service in question must have access to the events, so you will need to share health data with the user that will be used by the service.

In this example, we will focus on the code used to subscribe to events. Let's start by creating a service that subscribes to the creation of new HealthElement objects with the tag INTERNAL_INFERENCE_STATUS having the value TO_BE_STARTED and logs them in the console.

The subscribeToEvents method allows you to subscribe to the creation events of an HealthElement. In this example, we filter the events to receive only those with the tag type INTERNAL_INFERENCE_STATUS and the tag code TO_BE_STARTED.

This method returns a ReceiveChannel that allows us to consume the events.

import com.icure.cardinal.sdk.filters.ServiceFilters
import com.icure.cardinal.sdk.subscription.EntitySubscriptionConfiguration
import kotlinx.coroutines.channels.consumeEach

sdk.healthElement.subscribeToEvents (
setOf(SubscriptionEventType.Create),
HealthElementFilters.byTagForSelf(
tagType = "INTERNAL_INFERENCE_STATUS",
tagCode = "TO_BE_STARTED",
),
EntitySubscriptionConfiguration(
channelBufferCapacity = 100,
onBufferFull = EntitySubscriptionConfiguration.FullBufferBehaviour.Close,
reconnectionDelay = 2.seconds,
retryDelayExponentFactor = 2.0,
connectionMaxRetries = 5,
)
).eventChannel.consumeEach {
println("HealthElement created: $it")
}
note

This code needs to be executed in a coroutine context. The part managing this has been omitted for simplicity.

info

All events received by the SDK are encrypted. Therefore, you will need to decrypt them before processing.

Some explanations​

Buffered events​

We have completely abstracted the various concepts used to establish the connection and consume events. The goal is to provide you with a tool that allows you to react to events without worrying about setting up a WebSocket connection, handling connection errors, etc.

Events are sent by the Cardinal backend, and this event is then added to a buffer. You can specify the size of this buffer and the behavior to adopt if the buffer is full.

This buffer contains events that have not yet been consumed. An event can be:

  • A connection event (emitted once during the initial connection)
  • An error event:
    • An error event due to a missed ping from the server
    • An error event due to backend closure
    • An unexpected error event
  • A reconnection event (automatically initiated in case of a non-fatal error)
  • An event with an entity (Service, Contact, etc.) that has been created or modified (depending on your filter)
  • A deserialization error event (if an event cannot be deserialized)

This mechanism allows you to continue consuming events even if the connection is lost, without losing those already collected before the connection loss.

We will provide a more complete example later in this document.

Ping mechanism​

Cardinal SDK uses a ping mechanism to check the connection with the backend. This ping/pong is initiated by the backend after receiving the subscription message from the client.

The client must respond to this ping with a pong within a given time window. If the client does not respond, the backend considers the connection lost and closes the connection. Conversely, the client expects to receive a ping from the backend within a given time window. If the client does not receive a ping, it considers the connection lost and closes the connection.

In both cases, the client attempts to reconnect automatically in the event of a connection loss. You can specify the reconnection delay, the reconnection delay exponent factor, and the number of reconnection attempts.

If, after the number of reconnection attempts, the connection cannot be restored, a close event is emitted with the reason for the closure.

Close reasons​

Possible close reasons are as follows:

  • ChannelFull: The buffer is full, and the close behavior is set to Close
  • ConnectionLost: The connection was lost, and the maximum number of reconnection attempts was reached
  • IntentionallyClosed: The connection was intentionally closed by the client Dans le cas où le nombre de tentatives de reconnexion est atteint, la connexion est fermée. Vous pouvez consultez la raison de la fermeture de la connexion et réagir en conséquence.
import com.icure.cardinal.sdk.subscription.EntitySubscriptionCloseReason

subscription.closeReason?.let {
when (it) {
EntitySubscriptionCloseReason.ChannelFull -> println("Channel full")
EntitySubscriptionCloseReason.ConnectionLost -> println("Connection lost")
EntitySubscriptionCloseReason.IntentionallyClosed -> println("Subscription closed")
}
}

Advanced usage​

Now that we know the basics, we can determine how to handle different error cases and enable your service to react accordingly.

Recover from a connection error​

During a connection issue, the SDK automatically attempts to reconnect. You can respond to the Reconnected event upon receiving it.

Our documentation focuses on practical aspects, so how to react to this event shouldn't be directly copied in a production environment. You will need to adapt your recovery strategy based on your environment and how you consume events. Feel free to reach out to us to discuss your use case and the best way to get back on track after a reconnection.

In our case, the approach is as follows: upon the Connected or Reconnected event, we attempt to retrieve the missed events, a process handled by the getMissedEvents / get_missed_event method.

getMissedEvents / get_missed_event allows for retrieving missed events by filtering them using filterHealthElementsBy / filter_health_elements_by_blocking.

Each processed element is added to a queue for handling, and once an element is processed, it is updated with a new tag INTERNAL_INFERENCE_STATUS with the value DONE, and therefore will no longer match the initial filter.

At application startup, we retrieve the missed events and process them. We then wait for new events and process them as they come. If a reconnection event is received, we retrieve and process the missed events.

In this example, the number of reconnection attempts is limited to 5. If the limit is reached, the connection will be closed.

import com.icure.cardinal.sdk.filters.HealthElementFilters
import com.icure.cardinal.sdk.subscription.EntitySubscriptionConfiguration
import com.icure.cardinal.sdk.subscription.EntitySubscriptionEvent
import com.icure.cardinal.sdk.subscription.SubscriptionEventType
import com.icure.cardinal.sdk.utils.pagination.forEach
import com.icure.kryptom.crypto.defaultCryptoService
import kotlinx.coroutines.channels.consumeEach
import kotlin.time.Duration.Companion.seconds

suspend fun getMissedEvents() {
sdk.healthElement.filterHealthElementsBy(
HealthElementFilters.byTagForSelf(
tagType = "INTERNAL_INFERENCE_STATUS",
tagCode = "TO_BE_STARTED",
)
).forEach {
addToQueueToProcess(it)
}
}

sdk.healthElement.subscribeToEvents(
setOf(SubscriptionEventType.Create),
HealthElementFilters.byTagForSelf(
tagType = "INTERNAL_INFERENCE_STATUS",
tagCode = "TO_BE_STARTED",
),
EntitySubscriptionConfiguration(
channelBufferCapacity = 100,
onBufferFull = EntitySubscriptionConfiguration.FullBufferBehaviour.Close,
reconnectionDelay = 2.seconds,
retryDelayExponentFactor = 2.0,
connectionMaxRetries = 5,
)
).eventChannel.consumeEach {
when(it) {
is EntitySubscriptionEvent.Connected -> {
println("Starting listening for events, processing missed events")
getMissedEvents()
}
is EntitySubscriptionEvent.Reconnected -> {
println("Reconnected, starting to process missed events")
getMissedEvents()
}
is EntitySubscriptionEvent.EntityNotification -> {
// addToQueueToProcess(sdk.healthElement.decrypt(it.entity))
addToQueueToProcess(it.entity)
}
else -> println("Unexpected event: $it")
}
}