feat: [DEVICES] display is active (#141)

* feat: [DEVICES] display is active

* feat: [DEVICES] display is active

---------

Co-authored-by: Florent Champigny <florent@bere.al>
This commit is contained in:
Florent CHAMPIGNY 2025-08-22 16:13:13 +02:00 committed by GitHub
parent c1a55b72fd
commit 52142baecf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 200 additions and 78 deletions

View file

@ -2,8 +2,11 @@ package io.github.openflocon.data.core.device.datasource.remote
import io.github.openflocon.domain.device.models.DeviceIdAndPackageNameDomainModel
import io.github.openflocon.domain.messages.models.FloconIncomingMessageDomainModel
import kotlinx.coroutines.flow.Flow
interface RemoteDeviceDataSource {
val activeDevices: Flow<Set<DeviceIdAndPackageNameDomainModel>> // devices with active websocket connection
fun getDeviceSerial(message: FloconIncomingMessageDomainModel) : String?
suspend fun askForDeviceAppIcon(deviceIdAndPackageName: DeviceIdAndPackageNameDomainModel)
}

View file

@ -1,7 +1,6 @@
package com.flocon.data.remote.device.datasource
import com.flocon.data.remote.common.safeDecodeFromString
import com.flocon.data.remote.database.models.DatabaseOutgoingQueryMessage
import com.flocon.data.remote.device.model.RegisterDeviceDataModel
import com.flocon.data.remote.models.FloconOutgoingMessageDataModel
import com.flocon.data.remote.models.toRemote
@ -10,6 +9,9 @@ import io.github.openflocon.data.core.device.datasource.remote.RemoteDeviceDataS
import io.github.openflocon.domain.Protocol
import io.github.openflocon.domain.device.models.DeviceIdAndPackageNameDomainModel
import io.github.openflocon.domain.messages.models.FloconIncomingMessageDomainModel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlinx.serialization.json.Json
class DeviceRemoteDataSourceImpl(
@ -17,8 +19,19 @@ class DeviceRemoteDataSourceImpl(
private val server: Server,
) : RemoteDeviceDataSource {
override fun getDeviceSerial(message: FloconIncomingMessageDomainModel): String? = json.safeDecodeFromString<RegisterDeviceDataModel>(message.body)
?.serial
override val activeDevices: Flow<Set<DeviceIdAndPackageNameDomainModel>> = server.activeDevices
.map {
it.map {
DeviceIdAndPackageNameDomainModel(
deviceId = it.deviceId,
packageName = it.packageName
)
}.toSet()
}.distinctUntilChanged()
override fun getDeviceSerial(message: FloconIncomingMessageDomainModel): String? =
json.safeDecodeFromString<RegisterDeviceDataModel>(message.body)
?.serial
override suspend fun askForDeviceAppIcon(deviceIdAndPackageName: DeviceIdAndPackageNameDomainModel) {
server.sendMessageToClient(

View file

@ -5,6 +5,7 @@ import com.flocon.data.remote.models.FloconIncomingMessageDataModel
import com.flocon.data.remote.models.FloconOutgoingMessageDataModel
import io.github.openflocon.domain.Constant
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.StateFlow
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid
@ -19,6 +20,8 @@ interface Server {
deviceIdAndPackageName: FloconDeviceIdAndPackageNameDataModel,
message: FloconOutgoingMessageDataModel,
)
val activeDevices: Flow<Set<FloconDeviceIdAndPackageNameDataModel>>
}
@OptIn(ExperimentalUuidApi::class)

View file

@ -3,7 +3,9 @@ package com.flocon.data.remote.server
import com.flocon.data.remote.models.FloconDeviceIdAndPackageNameDataModel
import com.flocon.data.remote.models.FloconIncomingMessageDataModel
import com.flocon.data.remote.models.FloconOutgoingMessageDataModel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.flowOf
class ServerNoOp : Server {
override val receivedMessages = MutableSharedFlow<FloconIncomingMessageDataModel>()
@ -16,6 +18,8 @@ class ServerNoOp : Server {
// no op
}
override val activeDevices: Flow<Set<FloconDeviceIdAndPackageNameDataModel>> = flowOf(emptySet())
override fun stop() {
// no op
}

View file

@ -17,10 +17,14 @@ import io.ktor.websocket.Frame
import io.ktor.websocket.WebSocketSession
import io.ktor.websocket.readReason
import io.ktor.websocket.readText
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.update
import kotlinx.serialization.json.Json
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.time.Duration.Companion.seconds
@ -28,9 +32,14 @@ class ServerJvm : Server {
private val _receivedMessages = MutableSharedFlow<FloconIncomingMessageDataModel>()
override val receivedMessages = _receivedMessages.asSharedFlow()
private var server: EmbeddedServer<NettyApplicationEngine, NettyApplicationEngine.Configuration>? = null
private var server: EmbeddedServer<NettyApplicationEngine, NettyApplicationEngine.Configuration>? =
null
private val isStarted = AtomicBoolean(false)
private val activeSessions = ConcurrentHashMap<FloconDeviceIdAndPackageNameDataModel, WebSocketSession>()
private val _activeSessions =
MutableStateFlow(emptyMap<FloconDeviceIdAndPackageNameDataModel, WebSocketSession>())
override val activeDevices = _activeSessions.map { it.keys }
.distinctUntilChanged()
override fun start(port: Int) {
if (server != null && isStarted.get()) return
@ -46,51 +55,69 @@ class ServerJvm : Server {
routing {
webSocket("/") {
println("WebSocket connection established!")
val currentSession = this // Store the current session to use in finally block
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val receivedText = frame.readText()
println("<---- Received raw message: $receivedText")
try {
val floconIncomingMessageDataModel =
Json.decodeFromString<FloconIncomingMessageDataModel>(
receivedText,
)
activeSessions.put(
FloconDeviceIdAndPackageNameDataModel(
// Use a try-catch block to handle the exception when the channel is closed.
try {
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val receivedText = frame.readText()
println("<---- Received raw message: $receivedText")
try {
val floconIncomingMessageDataModel =
Json.decodeFromString<FloconIncomingMessageDataModel>(
receivedText,
)
val device = FloconDeviceIdAndPackageNameDataModel(
deviceId = floconIncomingMessageDataModel.deviceId,
packageName = floconIncomingMessageDataModel.appPackageName,
),
this,
)
// println("+ new client : ${floconIncomingMessageDataModel.deviceId}")
_receivedMessages.emit(floconIncomingMessageDataModel)
// Access other fields of floconMessage as needed
} catch (e: Exception) {
println("Error parsing JSON message: ${e.message}")
// Optionally send an error back to the client
// outgoing.send(Frame.Text("Error: Could not parse message as FloconIncomingMessageDataModel. ${e.message}"))
)
_activeSessions.update {
it + (device to currentSession)
}
// println("+ new client : ${floconIncomingMessageDataModel.deviceId}")
_receivedMessages.emit(floconIncomingMessageDataModel)
// Access other fields of floconMessage as needed
} catch (e: Exception) {
println("Error parsing JSON message: ${e.message}")
// Optionally send an error back to the client
// outgoing.send(Frame.Text("Error: Could not parse message as FloconIncomingMessageDataModel. ${e.message}"))
}
}
is Frame.Binary -> {
println("Received binary message (not printed): ${frame.data.size} bytes")
}
is Frame.Close -> {
val reason = frame.readReason()
_activeSessions.update { map ->
map.filterValues { it != currentSession }
}
println("WebSocket connection closed: ${reason?.message}")
}
is Frame.Ping -> {
println("Received Ping frame.")
}
is Frame.Pong -> {
println("Received Pong frame.")
}
}
is Frame.Binary -> {
println("Received binary message (not printed): ${frame.data.size} bytes")
}
is Frame.Close -> {
val reason = frame.readReason()
println("WebSocket connection closed: ${reason?.message}")
}
is Frame.Ping -> {
println("Received Ping frame.")
}
is Frame.Pong -> {
println("Received Pong frame.")
}
}
} catch (e: ClosedReceiveChannelException) {
println("WebSocket connection closed (channel closed): ${closeReason.await()}")
} catch (e: Exception) {
println("WebSocket error: ${e.message}")
} finally {
// This block will always be executed when the coroutine ends,
// whether the connection was closed cleanly or due to an error.
_activeSessions.update { map ->
map.filterValues { it != currentSession }
}
println("WebSocket : Session removed from active sessions.")
}
}
}
@ -112,8 +139,11 @@ class ServerJvm : Server {
* @param targetSession The WebSocketSession of the client to send the message to.
* @param message The message to send.
*/
override suspend fun sendMessageToClient(deviceIdAndPackageName: FloconDeviceIdAndPackageNameDataModel, message: FloconOutgoingMessageDataModel) {
val session = activeSessions[deviceIdAndPackageName]
override suspend fun sendMessageToClient(
deviceIdAndPackageName: FloconDeviceIdAndPackageNameDataModel,
message: FloconOutgoingMessageDataModel
) {
val session = _activeSessions.value[deviceIdAndPackageName]
if (session != null) {
try {
val jsonMessage =
@ -126,7 +156,9 @@ class ServerJvm : Server {
} catch (e: Exception) {
println("Error sending message to client: ${e.message}")
// The session might have closed unexpectedly
activeSessions.remove(deviceIdAndPackageName)
_activeSessions.update { map ->
map.filterKeys { it != deviceIdAndPackageName }
}
}
} else {
println("Target session is no longer active. deviceId=$deviceIdAndPackageName, message=$message")