Skip to main content

Implement the Subscriber

Subscriber Application Overview​

A Subscriber is any application that runs on a backend and has to perform an action immediately when data are created, modified, or deleted. An example could be an AI-algorithm that has to perform inference on medical samples when they are created.

In this example, the Subscriber will listen to the newly created samples of a certain type and will assign a label to them based on their value.

Initialize the Subscriber​

The first step for the Subscriber is to log in as a healthcare party to initialize the SDK. You can use the credentials that you obtained from the cockpit and those should be the same credentials you used to initialize the Publisher.

private const val CARDINAL_URL = "https://api.icure.cloud"

print("Login: ")
val username = readln().trim()
print("Password: ")
val password = readln().trim()
val sdk = CardinalSdk.initialize(
applicationId = null,
baseUrl = CARDINAL_URL,
authenticationMethod = AuthenticationMethod.UsingCredentials(
UsernamePassword(username, password)
),
baseStorage = FileStorageFacade("./scratch/storage")
)

Then, the Subscriber which types of data it wants to receive and it can do using a filter. The following filter includes all the service entities that are:

  • Shared with the healthcare party.
  • Have a CodeStub in the tag field with type LOINC and code 2339-0.
  • Have a CodeStub in the tag field with type CARDINAL and code TO_BE_ANALYZED.
val filter = ServiceFilters.byTagAndValueDateForSelf(
tagType = "LOINC",
tagCode = "2339-0"
).and(
ServiceFilters.byTagAndValueDateForSelf(
tagType = "CARDINAL",
tagCode = "TO_BE_ANALYZED"
)
)

Finally, the Subscriber can open the connection using the filter, receiving an event whenever a service that matches the filter is created or updated.

For more information about subscriptions and events, check this how-to.

val subscription = sdk.contact.subscribeToServiceCreateOrUpdateEvents(
filter = filter
)

React to Events​

Once the subscription is opened, the Subscriber can wait for the events to be produced:

for (event in subscription.eventChannel) {
// The code for handling the event will go here
}

Different types of events can be produced by the channel. In this example, the Subscriber will handle only the events related to opening a connection and receiving an entity.

for (event in subscription.eventChannel) {
when (event) {
EntitySubscriptionEvent.Connected -> {
println("Successfully opened connection")
}
is EntitySubscriptionEvent.EntityNotification -> {
// The rest of the code to handle the entity will go here
}
else -> println("Unexpected event: $event")
}
}

When a new service shared with the healthcare party and with the defined tags is created or updated, the appropriate event is fired. The event also contains the encrypted entity, so the first step for the Subscriber is to decrypt it.

is EntitySubscriptionEvent.EntityNotification -> {
val service = sdk.contact.decryptService(event.entity)
println("Received service ${service.id}")
// The rest of the code to handle the entity will go here
}

Now the Subscriber can check if the service has a blood sugar level measurement. If it has, it will choose a CodeStub to represent the diagnosis based on that value.

is EntitySubscriptionEvent.EntityNotification -> {
val service = sdk.contact.decryptService(event.entity)
println("Received service ${service.id}")
val measureOrNull = service.content["en"]?.measureValue?.value
if (measureOrNull != null) {
val inferenceResult = when {
measureOrNull < 80 -> {
CodeStub( // Snomed code for Hypoglycemia
id = "SNOMED|302866003|1",
type = "SNOMED",
code = "302866003",
version = "1"
)
}
measureOrNull > 130 -> {
CodeStub( // Snomed code for Hyperglycemia
id = "SNOMED|80394007|1",
type = "SNOMED",
code = "80394007",
version = "1"
)
}
else -> {
CodeStub( // Snomed code for normal range
id = "SNOMED|260395002|1",
type = "SNOMED",
code = "260395002",
version = "1"
)
}
}
// The rest of the code to handle the entity will go here
} else {
println("Service with id ${service.id} does not contain a valid measure.")
}
}

Finally, the Subscriber updates the service by removing the internal TO_BE_ANALYZED tag and putting the tag for the diagnosis plus another internal tag to signal that the service was updated.

However, a service is a nested entity inside a contact, therefore the Subscriber must access the contact and update the service inside it. To access the contact containing the service, it is possible to use the contactId field on the service, that is automatically filled during the creation.

is EntitySubscriptionEvent.EntityNotification -> {
val service = sdk.contact.decryptService(event.entity)
println("Received service ${service.id}")
val measureOrNull = service.content["en"]?.measureValue?.value
if (measureOrNull != null) {
val inferenceResult = when {
measureOrNull < 80 -> {
CodeStub( // Snomed code for Hypoglycemia
id = "SNOMED|302866003|1",
type = "SNOMED",
code = "302866003",
version = "1"
)
}
measureOrNull > 130 -> {
CodeStub( // Snomed code for Hyperglycemia
id = "SNOMED|80394007|1",
type = "SNOMED",
code = "80394007",
version = "1"
)
}
else -> {
CodeStub( // Snomed code for normal range
id = "SNOMED|260395002|1",
type = "SNOMED",
code = "260395002",
version = "1"
)
}
}
val analyzedCodeStub = CodeStub(
id = "CARDINAL|ANALYZED|1",
type = "CARDINAL",
code = "ANALYZED",
version = "1"
)
val updatedService = service.copy(
tags = service.tags.filterNot {
it.type == "CARDINAL" && it.code == "TO_BE_ANALYZED"
}.toSet() + setOf(inferenceResult, analyzedCodeStub)
)
if (service.contactId != null) {
val contact = sdk.contact.getContact(service.contactId!!)
sdk.contact.modifyContact(
contact.copy(
services = contact.services.filterNot {
it.id == updatedService.id
}.toSet() + setOf(updatedService)
)
)
println("Successfully updated contact")
} else {
println("Cannot find parent contact")
}
} else {
println("Service with id ${service.id} does not contain a valid measure.")
}
}

After updating the contact, the Subscriber will go back and wait for the following event.

Below, you will find the complete code of the loop for receiving and handling the events.

	for (event in subscription.eventChannel) {
when (event) {
EntitySubscriptionEvent.Connected -> {
println("Successfully opened connection")
}
is EntitySubscriptionEvent.EntityNotification -> {
val service = sdk.contact.decryptService(event.entity)
println("Received service ${service.id}")
val measureOrNull = service.content["en"]?.measureValue?.value
if (measureOrNull != null) {
val inferenceResult = when {
measureOrNull < 80 -> {
CodeStub( // Snomed code for Hypoglycemia
id = "SNOMED|302866003|1",
type = "SNOMED",
code = "302866003",
version = "1"
)
}
measureOrNull > 130 -> {
CodeStub( // Snomed code for Hyperglycemia
id = "SNOMED|80394007|1",
type = "SNOMED",
code = "80394007",
version = "1"
)
}
else -> {
CodeStub( // Snomed code for normal range
id = "SNOMED|260395002|1",
type = "SNOMED",
code = "260395002",
version = "1"
)
}
}
val analyzedCodeStub = CodeStub(
id = "CARDINAL|ANALYZED|1",
type = "CARDINAL",
code = "ANALYZED",
version = "1"
)
val updatedService = service.copy(
tags = service.tags.filterNot {
it.type == "CARDINAL" && it.code == "TO_BE_ANALYZED"
}.toSet() + setOf(inferenceResult, analyzedCodeStub)
)
if (service.contactId != null) {
val contact = sdk.contact.getContact(service.contactId!!)
sdk.contact.modifyContact(
contact.copy(
services = contact.services.filterNot {
it.id == updatedService.id
}.toSet() + setOf(updatedService)
)
)
println("Successfully updated contact")
} else {
println("Cannot find parent contact")
}
} else {
println("Service with id ${service.id} does not contain a valid measure.")
}
}
else -> println("Unexpected event: $event")
}
}