Implement the Subscriber
Subscriber Application Overview​
A Subscriber is any application that runs on a backend and has to perform an action immediately when data are created, modified, or deleted. An example could be an AI-algorithm that has to perform inference on medical samples when they are created.
In this example, the Subscriber will listen to the newly created samples of a certain type and will assign a label to them based on their value.
Initialize the Subscriber​
The first step for the Subscriber is to log in as a healthcare party to initialize the SDK. You can use the credentials that you obtained from the cockpit and those should be the same credentials you used to initialize the Publisher.
- Kotlin
- Python
- Typescript
private const val CARDINAL_URL = "https://api.icure.cloud"
print("Login: ")
val username = readln().trim()
print("Password: ")
val password = readln().trim()
val sdk = CardinalSdk.initialize(
applicationId = null,
baseUrl = CARDINAL_URL,
authenticationMethod = AuthenticationMethod.UsingCredentials(
UsernamePassword(username, password)
),
baseStorage = FileStorageFacade("./scratch/storage")
)
CARDINAL_URL = "https://api.icure.cloud"
username = input("Username: ")
password = input("Password: ")
sdk = CardinalSdk(
application_id=None,
baseurl=CARDINAL_URL,
authentication_method=UsernamePassword(username, password),
storage_facade=FileSystemStorage("../scratch/storage")
)
const CARDINAL_URL = "https://api.icure.cloud"
const username = await readLn("Login: ")
const password = await readLn("Password: ")
sdk = await CardinalSdk.initialize(
undefined,
CARDINAL_URL,
new AuthenticationMethod.UsingCredentials.UsernamePassword(username, password),
StorageFacade.usingFileSystem("../scratch/storage")
)
Then, the Subscriber which types of data it wants to receive and it can do using a filter. The following filter includes all the service entities that are:
- Shared with the healthcare party.
- Have a CodeStub in the
tag
field with typeLOINC
and code2339-0
. - Have a CodeStub in the
tag
field with typeCARDINAL
and codeTO_BE_ANALYZED
.
- Kotlin
- Python
- Typescript
val filter = ServiceFilters.byTagAndValueDateForSelf(
tagType = "LOINC",
tagCode = "2339-0"
).and(
ServiceFilters.byTagAndValueDateForSelf(
tagType = "CARDINAL",
tagCode = "TO_BE_ANALYZED"
)
)
service_filter = intersection(
ServiceFilters.by_tag_and_value_date_for_self(
tag_type="LOINC",
tag_code="2339-0"
),
ServiceFilters.by_tag_and_value_date_for_self(
tag_type="CARDINAL",
tag_code="TO_BE_ANALYZED"
)
)
const filter = intersection(
ServiceFilters.byTagAndValueDateForSelf(
"LOINC",
{tagCode: "2339-0"}
),
ServiceFilters.byTagAndValueDateForSelf(
"CARDINAL",
{tagCode: "TO_BE_ANALYZED"}
)
)
Finally, the Subscriber can open the connection using the filter, receiving an event whenever a service that matches the filter is created or updated.
For more information about subscriptions and events, check this how-to.
- Kotlin
- Python
- Typescript
val subscription = sdk.contact.subscribeToServiceCreateOrUpdateEvents(
filter = filter
)
subscription = sdk.contact.subscribe_to_service_create_or_update_events_blocking(
filter=service_filter
)
const subscription = await sdk.contact.subscribeToServiceCreateOrUpdateEvents(
filter
)
React to Events​
Once the subscription is opened, the Subscriber can wait for the events to be produced:
- Kotlin
- Python
- Typescript
for (event in subscription.eventChannel) {
// The code for handling the event will go here
}
while subscription.get_close_reason() is None:
event = subscription.wait_for_event_blocking(timedelta(seconds=10))
# The code for handling the event will go here
while (!subscription.isClosed) {
const event = await subscription.waitForEvent(10 * 1000)
// The code for handling the event will go here
}
Different types of events can be produced by the channel. In this example, the Subscriber will handle only the events related to opening a connection and receiving an entity.
- Kotlin
- Python
- Typescript
for (event in subscription.eventChannel) {
when (event) {
EntitySubscriptionEvent.Connected -> {
println("Successfully opened connection")
}
is EntitySubscriptionEvent.EntityNotification -> {
// The rest of the code to handle the entity will go here
}
else -> println("Unexpected event: $event")
}
}
while subscription.get_close_reason() is None:
event = subscription.wait_for_event_blocking(timedelta(seconds=10))
if event is None:
print("No event yet")
elif event.type == EntitySubscriptionEvent.Type.Connected:
print("Successfully opened connection")
elif event.type == EntitySubscriptionEvent.Type.EntityNotification:
# The rest of the code to handle the entity will go here
else:
print(f"Unexpected event: {event.type}")
while (!subscription.isClosed) {
const event = await subscription.waitForEvent(10 * 1000)
if (event === null) {
console.log("No event yet")
} else if (event.$ktClass === EntitySubscriptionEvent.Connected.$ktClass) {
console.log("Successfully opened connection")
} else if (event instanceof EntitySubscriptionEvent.EntityNotification) {
// The rest of the code to handle the entity will go here
} else {
console.log(`Unexpected event: ${event.$ktClass}`)
}
}
When a new service shared with the healthcare party and with the defined tags is created or updated, the appropriate event is fired. The event also contains the encrypted entity, so the first step for the Subscriber is to decrypt it.
- Kotlin
- Python
- Typescript
is EntitySubscriptionEvent.EntityNotification -> {
val service = sdk.contact.decryptService(event.entity)
println("Received service ${service.id}")
// The rest of the code to handle the entity will go here
}
elif event.type == EntitySubscriptionEvent.Type.EntityNotification:
service = sdk.contact.decrypt_service_blocking(event.entity)
print(f"Received service {service.id}")
else if (event instanceof EntitySubscriptionEvent.EntityNotification) {
const service = await sdk.contact.decryptService(event.entity)
console.log(`Received service ${service.id}`)
// The rest of the code to handle the entity will go here
}
Now the Subscriber can check if the service has a blood sugar level measurement. If it has, it will choose a CodeStub to represent the diagnosis based on that value.
- Kotlin
- Python
- Typescript
is EntitySubscriptionEvent.EntityNotification -> {
val service = sdk.contact.decryptService(event.entity)
println("Received service ${service.id}")
val measureOrNull = service.content["en"]?.measureValue?.value
if (measureOrNull != null) {
val inferenceResult = when {
measureOrNull < 80 -> {
CodeStub( // Snomed code for Hypoglycemia
id = "SNOMED|302866003|1",
type = "SNOMED",
code = "302866003",
version = "1"
)
}
measureOrNull > 130 -> {
CodeStub( // Snomed code for Hyperglycemia
id = "SNOMED|80394007|1",
type = "SNOMED",
code = "80394007",
version = "1"
)
}
else -> {
CodeStub( // Snomed code for normal range
id = "SNOMED|260395002|1",
type = "SNOMED",
code = "260395002",
version = "1"
)
}
}
// The rest of the code to handle the entity will go here
} else {
println("Service with id ${service.id} does not contain a valid measure.")
}
}
elif event.type == EntitySubscriptionEvent.Type.EntityNotification:
service = sdk.contact.decrypt_service_blocking(event.entity)
print(f"Received service {service.id}")
content = service.content.get("en")
if content is not None and content.measure_value.value is not None:
measure = content.measure_value.value
if measure < 80:
inference_result = CodeStub( # Snomed code for Hypoglycemia
id="SNOMED|302866003|1",
type="SNOMED",
code="302866003",
version="1"
)
elif measure > 130:
inference_result = CodeStub( # Snomed code for Hyperglycemia
id="SNOMED|80394007|1",
type="SNOMED",
code="80394007",
version="1"
)
else:
inference_result = CodeStub( # Snomed code for normal range
id="SNOMED|260395002|1",
type="SNOMED",
code="260395002",
version="1"
)
# The rest of the code to handle the entity will go here
else:
print(f"Service with id {service.id} does not contain a valid measure.")
else if (event instanceof EntitySubscriptionEvent.EntityNotification) {
const service = await sdk.contact.decryptService(event.entity)
console.log(`Received service ${service.id}`)
const measure = service.content["en"]?.measureValue?.value
if (!!measure) {
let inferenceResult: CodeStub
if (measure < 80) {
// Snomed code for Hypoglycemia
inferenceResult = new CodeStub({
id: "SNOMED|302866003|1",
type: "SNOMED",
code: "302866003",
version: "1"
})
} else if (measure > 130) {
// Snomed code for Hyperglycemia
inferenceResult = new CodeStub({
id: "SNOMED|80394007|1",
type: "SNOMED",
code: "80394007",
version: "1"
})
} else {
// Snomed code for normal range
inferenceResult = new CodeStub({
id: "SNOMED|260395002|1",
type: "SNOMED",
code: "260395002",
version: "1"
})
}
// The rest of the code to handle the entity will go here
} else {
console.log(`Service with id ${service.id} does not contain a valid measure.`)
}
}
Finally, the Subscriber updates the service by removing the internal TO_BE_ANALYZED
tag and putting the tag for the
diagnosis plus another internal tag to signal that the service was updated.
However, a service is a nested entity inside a contact, therefore the Subscriber must access the contact and update the
service inside it. To access the contact containing the service, it is possible to use the contactId
field on the
service, that is automatically filled during the creation.
- Kotlin
- Python
- Typescript
is EntitySubscriptionEvent.EntityNotification -> {
val service = sdk.contact.decryptService(event.entity)
println("Received service ${service.id}")
val measureOrNull = service.content["en"]?.measureValue?.value
if (measureOrNull != null) {
val inferenceResult = when {
measureOrNull < 80 -> {
CodeStub( // Snomed code for Hypoglycemia
id = "SNOMED|302866003|1",
type = "SNOMED",
code = "302866003",
version = "1"
)
}
measureOrNull > 130 -> {
CodeStub( // Snomed code for Hyperglycemia
id = "SNOMED|80394007|1",
type = "SNOMED",
code = "80394007",
version = "1"
)
}
else -> {
CodeStub( // Snomed code for normal range
id = "SNOMED|260395002|1",
type = "SNOMED",
code = "260395002",
version = "1"
)
}
}
val analyzedCodeStub = CodeStub(
id = "CARDINAL|ANALYZED|1",
type = "CARDINAL",
code = "ANALYZED",
version = "1"
)
val updatedService = service.copy(
tags = service.tags.filterNot {
it.type == "CARDINAL" && it.code == "TO_BE_ANALYZED"
}.toSet() + setOf(inferenceResult, analyzedCodeStub)
)
if (service.contactId != null) {
val contact = sdk.contact.getContact(service.contactId!!)
sdk.contact.modifyContact(
contact.copy(
services = contact.services.filterNot {
it.id == updatedService.id
}.toSet() + setOf(updatedService)
)
)
println("Successfully updated contact")
} else {
println("Cannot find parent contact")
}
} else {
println("Service with id ${service.id} does not contain a valid measure.")
}
}
elif event.type == EntitySubscriptionEvent.Type.EntityNotification:
service = sdk.contact.decrypt_service_blocking(event.entity)
print(f"Received service {service.id}")
content = service.content.get("en")
if content is not None and content.measure_value.value is not None:
measure = content.measure_value.value
if measure < 80:
inference_result = CodeStub( # Snomed code for Hypoglycemia
id="SNOMED|302866003|1",
type="SNOMED",
code="302866003",
version="1"
)
elif measure > 130:
inference_result = CodeStub( # Snomed code for Hyperglycemia
id="SNOMED|80394007|1",
type="SNOMED",
code="80394007",
version="1"
)
else:
inference_result = CodeStub( # Snomed code for normal range
id="SNOMED|260395002|1",
type="SNOMED",
code="260395002",
version="1"
)
analyzed_code_stub = CodeStub(
id="CARDINAL|ANALYZED|1",
type="CARDINAL",
code="ANALYZED",
version="1"
)
filtered_tags = [tag for tag in service.tags if not (tag.type == "CARDINAL" and tag.code == "TO_BE_ANALYZED")]
service.tags = filtered_tags + [inference_result, analyzed_code_stub]
if service.contact_id is not None:
contact = sdk.contact.get_contact_blocking(service.contact_id)
filtered_services = [s for s in contact.services if s.id != service.id]
contact.services = filtered_services + [service]
sdk.contact.modify_contact_blocking(contact)
print("Successfully updated contact")
else:
print("Cannot find parent contact")
else:
print(f"Service with id {service.id} does not contain a valid measure.")
else if (event instanceof EntitySubscriptionEvent.EntityNotification) {
const service = await sdk.contact.decryptService(event.entity)
console.log(`Received service ${service.id}`)
const measure = service.content["en"]?.measureValue?.value
if (!!measure) {
let inferenceResult: CodeStub
if (measure < 80) {
// Snomed code for Hypoglycemia
inferenceResult = new CodeStub({
id: "SNOMED|302866003|1",
type: "SNOMED",
code: "302866003",
version: "1"
})
} else if (measure > 130) {
// Snomed code for Hyperglycemia
inferenceResult = new CodeStub({
id: "SNOMED|80394007|1",
type: "SNOMED",
code: "80394007",
version: "1"
})
} else {
// Snomed code for normal range
inferenceResult = new CodeStub({
id: "SNOMED|260395002|1",
type: "SNOMED",
code: "260395002",
version: "1"
})
}
const analyzedCodeStub = new CodeStub({
id: "CARDINAL|ANALYZED|1",
type: "CARDINAL",
code: "ANALYZED",
version: "1"
})
const updatedService = new DecryptedService({
...service,
tags: service.tags
.filter(it => it.type !== "CARDINAL" || it.code !== "TO_BE_ANALYZED")
.concat(inferenceResult, analyzedCodeStub)
})
if (!!updatedService.contactId) {
const contact = await sdk.contact.getContact(service.contactId)
await sdk.contact.modifyContact(
new DecryptedContact({
...contact,
services: contact.services
.filter(it => it.id !== updatedService.id)
.concat(updatedService)
})
)
console.log("Successfully updated contact")
} else {
console.log("Cannot find parent contact")
}
} else {
console.log(`Service with id ${service.id} does not contain a valid measure.`)
}
}
After updating the contact, the Subscriber will go back and wait for the following event.
Below, you will find the complete code of the loop for receiving and handling the events.
- Kotlin
- Python
- Typescript
for (event in subscription.eventChannel) {
when (event) {
EntitySubscriptionEvent.Connected -> {
println("Successfully opened connection")
}
is EntitySubscriptionEvent.EntityNotification -> {
val service = sdk.contact.decryptService(event.entity)
println("Received service ${service.id}")
val measureOrNull = service.content["en"]?.measureValue?.value
if (measureOrNull != null) {
val inferenceResult = when {
measureOrNull < 80 -> {
CodeStub( // Snomed code for Hypoglycemia
id = "SNOMED|302866003|1",
type = "SNOMED",
code = "302866003",
version = "1"
)
}
measureOrNull > 130 -> {
CodeStub( // Snomed code for Hyperglycemia
id = "SNOMED|80394007|1",
type = "SNOMED",
code = "80394007",
version = "1"
)
}
else -> {
CodeStub( // Snomed code for normal range
id = "SNOMED|260395002|1",
type = "SNOMED",
code = "260395002",
version = "1"
)
}
}
val analyzedCodeStub = CodeStub(
id = "CARDINAL|ANALYZED|1",
type = "CARDINAL",
code = "ANALYZED",
version = "1"
)
val updatedService = service.copy(
tags = service.tags.filterNot {
it.type == "CARDINAL" && it.code == "TO_BE_ANALYZED"
}.toSet() + setOf(inferenceResult, analyzedCodeStub)
)
if (service.contactId != null) {
val contact = sdk.contact.getContact(service.contactId!!)
sdk.contact.modifyContact(
contact.copy(
services = contact.services.filterNot {
it.id == updatedService.id
}.toSet() + setOf(updatedService)
)
)
println("Successfully updated contact")
} else {
println("Cannot find parent contact")
}
} else {
println("Service with id ${service.id} does not contain a valid measure.")
}
}
else -> println("Unexpected event: $event")
}
}
while subscription.get_close_reason() is None:
event = subscription.wait_for_event_blocking(timedelta(seconds=10))
if event is None:
print("No event yet")
elif event.type == EntitySubscriptionEvent.Type.Connected:
print("Successfully opened connection")
elif event.type == EntitySubscriptionEvent.Type.EntityNotification:
service = sdk.contact.decrypt_service_blocking(event.entity)
print(f"Received service {service.id}")
content = service.content.get("en")
if content is not None and content.measure_value.value is not None:
measure = content.measure_value.value
if measure < 80:
inference_result = CodeStub( # Snomed code for Hypoglycemia
id="SNOMED|302866003|1",
type="SNOMED",
code="302866003",
version="1"
)
elif measure > 130:
inference_result = CodeStub( # Snomed code for Hyperglycemia
id="SNOMED|80394007|1",
type="SNOMED",
code="80394007",
version="1"
)
else:
inference_result = CodeStub( # Snomed code for normal range
id="SNOMED|260395002|1",
type="SNOMED",
code="260395002",
version="1"
)
analyzed_code_stub = CodeStub(
id="CARDINAL|ANALYZED|1",
type="CARDINAL",
code="ANALYZED",
version="1"
)
filtered_tags = [tag for tag in service.tags if not (tag.type == "CARDINAL" and tag.code == "TO_BE_ANALYZED")]
service.tags = filtered_tags + [inference_result, analyzed_code_stub]
if service.contact_id is not None:
contact = sdk.contact.get_contact_blocking(service.contact_id)
filtered_services = [s for s in contact.services if s.id != service.id]
contact.services = filtered_services + [service]
sdk.contact.modify_contact_blocking(contact)
print("Successfully updated contact")
else:
print("Cannot find parent contact")
else:
print(f"Service with id {service.id} does not contain a valid measure.")
else:
print(f"Unexpected event: {event.type}")
while (!subscription.isClosed) {
const event = await subscription.waitForEvent(10 * 1000)
if (event === null) {
console.log("No event yet")
} else if (event.$ktClass === EntitySubscriptionEvent.Connected.$ktClass) {
console.log("Successfully opened connection")
} else if (event instanceof EntitySubscriptionEvent.EntityNotification) {
const service = await sdk.contact.decryptService(event.entity)
console.log(`Received service ${service.id}`)
const measure = service.content["en"]?.measureValue?.value
if (!!measure) {
let inferenceResult: CodeStub
if (measure < 80) {
// Snomed code for Hypoglycemia
inferenceResult = new CodeStub({
id: "SNOMED|302866003|1",
type: "SNOMED",
code: "302866003",
version: "1"
})
} else if (measure > 130) {
// Snomed code for Hyperglycemia
inferenceResult = new CodeStub({
id: "SNOMED|80394007|1",
type: "SNOMED",
code: "80394007",
version: "1"
})
} else {
// Snomed code for normal range
inferenceResult = new CodeStub({
id: "SNOMED|260395002|1",
type: "SNOMED",
code: "260395002",
version: "1"
})
}
const analyzedCodeStub = new CodeStub({
id: "CARDINAL|ANALYZED|1",
type: "CARDINAL",
code: "ANALYZED",
version: "1"
})
const updatedService = new DecryptedService({
...service,
tags: service.tags
.filter(it => it.type !== "CARDINAL" || it.code !== "TO_BE_ANALYZED")
.concat(inferenceResult, analyzedCodeStub)
})
if (!!updatedService.contactId) {
const contact = await sdk.contact.getContact(service.contactId)
await sdk.contact.modifyContact(
new DecryptedContact({
...contact,
services: contact.services
.filter(it => it.id !== updatedService.id)
.concat(updatedService)
})
)
console.log("Successfully updated contact")
} else {
console.log("Cannot find parent contact")
}
} else {
console.log(`Service with id ${service.id} does not contain a valid measure.`)
}
} else {
console.log(`Unexpected event: ${event.$ktClass}`)
}
}