How to subscribe to events ?
Introduction​
In some cases, you may want to react to the creation or update of data in Cardinal. Since this data is encrypted, we cannot host this code in Cardinal. That's why we provide you with the tools to implement a microservice/backend that can subscribe to these different events and react accordingly.
Example use cases​
- Analyze a measurement taken on a patient using a machine learning model
- Send an email to a patient when a new appointment is created
- Update a patient's status in an external system when their health status changes
- etc.
Setup​
You will need to host a service that can subscribe to these events using the Cardinal SDK. The service in question must have access to the events, so you will need to share health data with the user that will be used by the service.
In this example, we will focus on the code used to subscribe to events. Let's start by creating a service
that subscribes to the creation of new HealthElement
objects with the tag INTERNAL_INFERENCE_STATUS
having the value
TO_BE_STARTED
and logs them in the console.
- Kotlin
- Typescript
- Python
The subscribeToEvents
method allows you to subscribe to the creation events of an
HealthElement
. In this example, we filter the events to receive only those with the tag type INTERNAL_INFERENCE_STATUS
and the tag code TO_BE_STARTED
.
This method returns a ReceiveChannel
that allows us to consume the events.
import com.icure.cardinal.sdk.filters.ServiceFilters
import com.icure.cardinal.sdk.subscription.EntitySubscriptionConfiguration
import kotlinx.coroutines.channels.consumeEach
sdk.healthElement.subscribeToEvents (
setOf(SubscriptionEventType.Create),
HealthElementFilters.byTagForSelf(
tagType = "INTERNAL_INFERENCE_STATUS",
tagCode = "TO_BE_STARTED",
),
EntitySubscriptionConfiguration(
channelBufferCapacity = 100,
onBufferFull = EntitySubscriptionConfiguration.FullBufferBehaviour.Close,
reconnectionDelay = 2.seconds,
retryDelayExponentFactor = 2.0,
connectionMaxRetries = 5,
)
).eventChannel.consumeEach {
println("HealthElement created: $it")
}
This code needs to be executed in a coroutine context. The part managing this has been omitted for simplicity.
The subscribeToEvents
method allows you to subscribe to the creation events of an
HealthElement
. In this example, we filter the events to receive only those with the tag type INTERNAL_INFERENCE_STATUS
and the tag code TO_BE_STARTED
.
This method returns an EntitySubscription
that allows us to consume the events.
To consume the events, we use the waitForEvent
method, which lets us wait for an event for a certain time. If no event
is received, the method returns null
.
We loop as long as the EntitySubscription
is not closed.
import {
EntitySubscriptionConfiguration,
ServiceFilters,
} from "@icure/cardinal-sdk";
const subscription = await sdk.healthElement.subscribeToEvents(
[SubscriptionEventType.Create],
HealthElementFilters.byTagForSelf(
'INTERNAL_INFERENCE_STATUS',
{
tagCode: 'TO_BE_STARTED',
},
),
{
subscriptionConfig: new EntitySubscriptionConfiguration(
{
channelBufferCapacity: 100,
onBufferFull: EntitySubscriptionConfiguration.FullBufferBehaviour.Close,
reconnectionDelay: 2 * 1000,
retryDelayExponentFactor: 2.0,
connectionMaxRetries: 5,
},
),
},
)
while (!subscription.isClosed) {
const event = await subscription.waitForEvent(10 * 1000)
if (event !== null) {
console.log(`Received event: ${JSON.stringify(event)}`)
} else {
console.log('No new event yet')
}
}
The subscribe_to_events_blocking
method allows you to subscribe to the creation events of an
HealthElement
. In this example, we filter the events to receive only those with the tag type INTERNAL_INFERENCE_STATUS
and the tag code TO_BE_STARTED
.
This method returns an EntitySubscription
that allows us to consume the events.
To consume the events, we use the wait_for_event_blocking
method, which lets us wait for an event for a
certain time. If no event is received, the method returns None
.
We loop as long as the EntitySubscription
is not closed.
from cardinal_sdk.filters import ServiceFilters
from cardinal_sdk.model import EntitySubscriptionConfiguration
from datetime import timedelta
subscription = sdk.health_element.subscribe_to_events_blocking(
[SubscriptionEventType.Create],
HealthElementFilters.by_tag_for_self(
"INTERNAL_INFERENCE_STATUS",
"TO_BE_STARTED",
),
EntitySubscriptionConfiguration(
channel_buffer_capacity=100,
on_buffer_full=EntitySubscriptionConfiguration.FullBufferBehaviour.Close,
reconnection_delay=timedelta(seconds=5),
retry_delay_exponent_factor=2.0,
connection_max_retries=5,
)
)
while subscription.get_close_reason() is None:
event = subscription.wait_for_event_blocking(timedelta(seconds=10))
if event is not None:
print(event)
else:
print("No new event yet")
All events received by the SDK are encrypted. Therefore, you will need to decrypt them before processing.
Some explanations​
Buffered events​
We have completely abstracted the various concepts used to establish the connection and consume events. The goal is to provide you with a tool that allows you to react to events without worrying about setting up a WebSocket connection, handling connection errors, etc.
Events are sent by the Cardinal backend, and this event is then added to a buffer. You can specify the size of this buffer and the behavior to adopt if the buffer is full.
This buffer contains events that have not yet been consumed. An event can be:
- A connection event (emitted once during the initial connection)
- An error event:
- An error event due to a missed ping from the server
- An error event due to backend closure
- An unexpected error event
- A reconnection event (automatically initiated in case of a non-fatal error)
- An event with an entity (Service, Contact, etc.) that has been created or modified (depending on your filter)
- A deserialization error event (if an event cannot be deserialized)
This mechanism allows you to continue consuming events even if the connection is lost, without losing those already collected before the connection loss.
We will provide a more complete example later in this document.
Ping mechanism​
Cardinal SDK uses a ping mechanism to check the connection with the backend. This ping/pong is initiated by the backend after receiving the subscription message from the client.
The client must respond to this ping with a pong within a given time window. If the client does not respond, the backend considers the connection lost and closes the connection. Conversely, the client expects to receive a ping from the backend within a given time window. If the client does not receive a ping, it considers the connection lost and closes the connection.
In both cases, the client attempts to reconnect automatically in the event of a connection loss. You can specify the reconnection delay, the reconnection delay exponent factor, and the number of reconnection attempts.
If, after the number of reconnection attempts, the connection cannot be restored, a close event is emitted with the reason for the closure.
Close reasons​
Possible close reasons are as follows:
ChannelFull
: The buffer is full, and the close behavior is set toClose
ConnectionLost
: The connection was lost, and the maximum number of reconnection attempts was reachedIntentionallyClosed
: The connection was intentionally closed by the client Dans le cas où le nombre de tentatives de reconnexion est atteint, la connexion est fermée. Vous pouvez consultez la raison de la fermeture de la connexion et réagir en conséquence.
- Kotlin
- Typescript
- Python
import com.icure.cardinal.sdk.subscription.EntitySubscriptionCloseReason
subscription.closeReason?.let {
when (it) {
EntitySubscriptionCloseReason.ChannelFull -> println("Channel full")
EntitySubscriptionCloseReason.ConnectionLost -> println("Connection lost")
EntitySubscriptionCloseReason.IntentionallyClosed -> println("Subscription closed")
}
}
import {
EntitySubscriptionCloseReason,
} from "@icure/cardinal-sdk";
switch (subscription.closeReason) {
case null:
console.log("Subscription closed")
break
case EntitySubscriptionCloseReason.IntentionallyClosed:
console.log("Subscription closed intentionally")
break
case EntitySubscriptionCloseReason.ChannelFull:
console.log("Subscription closed because channel is full")
break
case EntitySubscriptionCloseReason.ConnectionLost:
console.log("Subscription closed because connection was lost")
break
default:
console.log("Subscription closed for unknown reason")
}
from cardinal_sdk.model import EntitySubscriptionCloseReason
if subscription.get_close_reason() == EntitySubscriptionCloseReason.IntentionallyClosed:
print("Subscription closed intentionally")
elif subscription.get_close_reason() == EntitySubscriptionCloseReason.ChannelFull:
print("Subscription closed because channel is full")
elif subscription.get_close_reason() == EntitySubscriptionCloseReason.ConnectionLost:
print("Subscription closed because connection was lost")
else:
print("Subscription closed for unknown reason")
Advanced usage​
Now that we know the basics, we can determine how to handle different error cases and enable your service to react accordingly.
Recover from a connection error​
During a connection issue, the SDK automatically attempts to reconnect. You can respond to the Reconnected
event upon
receiving it.
Our documentation focuses on practical aspects, so how to react to this event shouldn't be directly copied in a production environment. You will need to adapt your recovery strategy based on your environment and how you consume events. Feel free to reach out to us to discuss your use case and the best way to get back on track after a reconnection.
In our case, the approach is as follows: upon the Connected
or Reconnected
event, we attempt to retrieve the missed
events, a process handled by the getMissedEvents
/ get_missed_event
method.
getMissedEvents
/ get_missed_event
allows for retrieving missed events by filtering them using filterHealthElementsBy
/ filter_health_elements_by_blocking
.
Each processed element is added to a queue for handling, and once an element is processed, it is updated with a new tag
INTERNAL_INFERENCE_STATUS
with the value DONE
, and therefore will no longer match the initial filter.
At application startup, we retrieve the missed events and process them. We then wait for new events and process them as they come. If a reconnection event is received, we retrieve and process the missed events.
In this example, the number of reconnection attempts is limited to 5. If the limit is reached, the connection will be closed.
- Kotlin
- Typescript
- Python
import com.icure.cardinal.sdk.filters.HealthElementFilters
import com.icure.cardinal.sdk.subscription.EntitySubscriptionConfiguration
import com.icure.cardinal.sdk.subscription.EntitySubscriptionEvent
import com.icure.cardinal.sdk.subscription.SubscriptionEventType
import com.icure.cardinal.sdk.utils.pagination.forEach
import com.icure.kryptom.crypto.defaultCryptoService
import kotlinx.coroutines.channels.consumeEach
import kotlin.time.Duration.Companion.seconds
suspend fun getMissedEvents() {
sdk.healthElement.filterHealthElementsBy(
HealthElementFilters.byTagForSelf(
tagType = "INTERNAL_INFERENCE_STATUS",
tagCode = "TO_BE_STARTED",
)
).forEach {
addToQueueToProcess(it)
}
}
sdk.healthElement.subscribeToEvents(
setOf(SubscriptionEventType.Create),
HealthElementFilters.byTagForSelf(
tagType = "INTERNAL_INFERENCE_STATUS",
tagCode = "TO_BE_STARTED",
),
EntitySubscriptionConfiguration(
channelBufferCapacity = 100,
onBufferFull = EntitySubscriptionConfiguration.FullBufferBehaviour.Close,
reconnectionDelay = 2.seconds,
retryDelayExponentFactor = 2.0,
connectionMaxRetries = 5,
)
).eventChannel.consumeEach {
when(it) {
is EntitySubscriptionEvent.Connected -> {
println("Starting listening for events, processing missed events")
getMissedEvents()
}
is EntitySubscriptionEvent.Reconnected -> {
println("Reconnected, starting to process missed events")
getMissedEvents()
}
is EntitySubscriptionEvent.EntityNotification -> {
// addToQueueToProcess(sdk.healthElement.decrypt(it.entity))
addToQueueToProcess(it.entity)
}
else -> println("Unexpected event: $it")
}
}
import {
DecryptedHealthElement,
EntitySubscriptionConfiguration,
EntitySubscriptionEvent,
HealthElement,
HealthElementFilters,
StorageFacade,
SubscriptionEventType,
} from "@icure/cardinal-sdk";
import UsernamePassword = AuthenticationMethod.UsingCredentials.UsernamePassword;
import EntityNotification = EntitySubscriptionEvent.EntityNotification;
const getMissedEvents = async (): Promise<Array<DecryptedHealthElement>> => {
const paginatedListIterator = await sdk.healthElement.encrypted.filterHealthElementsBy(
HealthElementFilters.byTagForSelf(
"INTERNAL_INFERENCE_STATUS",
{
tagCode: "TO_BE_STARTED",
}
)
)
const batch = 100
const events = []
while (await paginatedListIterator.hasNext()) {
const page = await paginatedListIterator.next(batch)
events.push(...page)
}
return events
}
const subscription = await sdk.healthElement.subscribeToEvents(
[SubscriptionEventType.Create],
HealthElementFilters.byTagForSelf(
"INTERNAL_INFERENCE_STATUS",
{
tagCode: "TO_BE_STARTED",
},
),
{
subscriptionConfig: new EntitySubscriptionConfiguration(
{
channelBufferCapacity: 100,
onBufferFull: EntitySubscriptionConfiguration.FullBufferBehaviour.Close,
reconnectionDelay: 2 * 1000,
retryDelayExponentFactor: 2.0,
connectionMaxRetries: 5
}
)
}
)
while (!subscription.isClosed) {
const event = await subscription.waitForEvent(10 * 1000)
if (event !== null) {
if (event instanceof EntityNotification) {
// await addToQueueToProcess(await sdk.healthElement.decrypt(event.entity))
await addToQueueToProcess(event.entity)
}
if (event.$ktClass == EntitySubscriptionEvent.Connected.$ktClass || event.$ktClass == EntitySubscriptionEvent.Reconnected.$ktClass) {
const missedEvents = await getMissedEvents()
for (const missedEvent of missedEvents) {
await addToQueueToProcess(missedEvent)
}
}
}
else {
console.log("No new event yet")
}
}
from cardinal_sdk.filters import HealthElementFilters
from cardinal_sdk.model import EntitySubscriptionConfiguration, SubscriptionEventType
from cardinal_sdk.subscription import EntitySubscriptionEvent
from typing import List
def get_missed_event() -> List[DecryptedHealthElement]:
iterator = sdk.health_element.filter_health_elements_by_blocking(
HealthElementFilters.by_tag_for_self(
"INTERNAL_INFERENCE_STATUS",
"TO_BE_STARTED",
)
)
elements = []
batch_size = 100
while iterator.has_next_blocking():
elements.extend(iterator.next_blocking(batch_size))
return elements
subscription = sdk.health_element.subscribe_to_events_blocking(
[SubscriptionEventType.Create],
HealthElementFilters.by_tag_for_self(
"INTERNAL_INFERENCE_STATUS",
"TO_BE_STARTED",
),
EntitySubscriptionConfiguration(
channel_buffer_capacity=100,
on_buffer_full=EntitySubscriptionConfiguration.FullBufferBehaviour.Close,
reconnection_delay=timedelta(seconds=5),
retry_delay_exponent_factor=2.0,
connection_max_retries=5,
)
)
while subscription.get_close_reason() is None:
event = subscription.wait_for_event_blocking(timedelta(seconds=10))
if event is not None:
if event.type in [EntitySubscriptionEvent.Type.Connected, EntitySubscriptionEvent.Type.Reconnected]:
missed_events = get_missed_event()
for missed_event in missed_events:
addToQueueToProcess(missed_event)
elif event.type == EntitySubscriptionEvent.Type.EntityNotification:
# addToQueueToProcess(sdk.health_element.decrypt_blocking(event.entity))
addToQueueToProcess(event.entity)
else:
print("No new event yet")