iOS, . Android, , . , , , Android - , .
, , : WAMP, .
- @antoha-gs, - , .
WAMP. , WebSocket PubSub RPC . WAMP โ , Java/Kotlin, .
. , , , . โ , . - .
, , Sendbird โ , . . Sendbird .
:
ยซ, Socket.IO, : 1) . 2) . WAMP โ โ . - MQTTยป.
, , . , / (at most/at least/exactly), . , , at most once, WAMP . .
MQTT โ , , , WAMP, . XMPP (aka Jabber), , MQTT WAMP, , ยซยป . , , , , .
, , , . .
-
, WAMP .
. .
(PUBLISH โ 16, SUBSCRIBE โ 32 ). QA ( , [33,11,5862354]).
(, ) id . - . ( id ):client โ [32,18,{},"co.fun.chat.testChatId"]backend โ [33,18,5868752 (id )]client โ id [34,20,5868752]
OkHttp (, , , ping-pong ) RxJava, โ event-based programming, Rx, , .
, WAMP- OkHttpClient:
val request = Request.Builder()
.url(ChatsConfig.SOCKETURL)
.addHeader("Connection", "Upgrade")
.addHeader("Sec-WebSocket-Protocol", "wamp.json")
.addHeader("Authorization", authToken)
.build()
val listener = ChatWebSocketListener()
webSocket = okHttpClient.newWebSocket(request, listener)
ChatWebSocketListener:
private inner class ChatWebSocketListener : WebSocketListener() {
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
connectionStatusSubject.onNext(ChatConnectionStatuses.NOTCONNECTED)
//subject, ( UI , - )
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
webSocket.close(1000, null)
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
onConnectionError("${t.message} ${response?.body}")
}
override fun onMessage(webSocket: WebSocket, text: String) {
socketMessagesSubject.onNext(serverMessageFactory.processMessage(text)) //subject, , (. )
}
override fun onOpen(webSocket: WebSocket, response: Response) {
authorize()
}
}
, String, JSON, WAMP :
[ResultCode: Int, RequestId: Long, ArgumentsMap: JsonObject ]
:
[50, 7, {"type":100, "chats":[ ]}]
Gson. data- :
@DontObfuscate
data class ChatListResponse(@SerializedName("chats") val chatList: List<Chat>)
:
private fun chatListUpdateInternal(jsonChatsResponse: JSONObject):
ChatsListUpdatesEvent {
return gson.fromJson(jsonChatsResponse.toString(),
ChatsListUpdatesEvent::class.java)
}
. WAMP :
sealed class WampMessage {
class BaseMessage(val wampId: Int, val seq: Long, val jsonData: JSONArray) : WampMessage()
class ErrorMessage(val procedureId: Int, val seq: Long, val jsonData: JSONArray) : WampMessage()
object WelcomeMessage : WampMessage()
class AbortMessage(val jsonData: JSONArray) : WampMessage()
}
:
fun getCallMessage(rpc: String,
options: Map<String, Any> = emptyMap(),
arguments: List<Any?> = emptyList(),
argumentsDict: Map<String, Any?> = emptyMap()):
WampMessage.BaseMessage {
//[CALL, Request|id, Options|dict, Procedure|uri, Arguments|list]
val seq = nextSeq.getAndIncrement()
return WampMessage.BaseMessage(WAMP.MessageIds.CALL,
seq,
JSONArray(listOfNotNull(WAMP.MessageIds.CALL,
seq,
options,
rpc,
arguments,
argumentsDict)))
}
:
val messages: Observable<WampMessage> = socketMessagesSubject
fun sendMessage(msgToSend: WampMessage.BaseMessage):
Observable<WampMessage> {
return messages.filter {
it is WampMessage.BaseMessage && it.seq == msgToSend.seq
}
.take(1)
.doOnSubscribe {
webSocket.send(msgToSend.jsonData.toString())
}
}
WAMP seq, , .
:
companion object {
private val nextSeq: AtomicLong = AtomicLong(1)
}
fun getNextSeq() = nextSeq.getAndIncrement()
WAMP Subscriptions
WAMP โ , () - , . :
;
;
- ;
;
(, );
.
:
[SUBSCRIBE: Int, RequestId: Long, Options: Map, Topic: String]
topic โ , .
:
fun getSubscribeMessage(topic: String, options: Map<String, Any> = emptyMap()):
WampMessage.BaseMessage {
val seq = nextSeq.getAndIncrement()
return WampMessage.BaseMessage(WAMP.MessageIds.SUBSCRIBE,
seq,
JSONArray(listOfNotNull(WAMP.MessageIds.SUBSCRIBE,
seq,
options,
topic)))
}
, (, ), . WAMP: subscribe- id , , โ id, .
API , :
private val subscriptionsMap = ArrayMap<String, Long>()
private fun getBaseSubscription(topic: String): Observable<WampMessage> {
val msg = wampClientMessageFactory.getSubscribeMessage(topic)
return send(msg).map {
val subscriptionId = converter.getSubscriptionId((it.asBaseMessage()).jsonData)
subscriptionsMap[topic] = subscriptionId
subscriptionId
}
.switchMap { subscriptionId ->
chatClient.messages.filter {
it.isMessageFromSubscription(subscriptionId)
}
}
}
id, , :
fun unsubscribeFromTopic(topic: String) {
if (!subscriptionsMap.contains(topic)) {
return
}
val msg =
wampClientMessageFactory.getUnsubscribeMessage(subscriptionsMap[topic])
send(msg, true).exSubscribe()
subscriptionsMap.remove(topic)
}