Chats on websockets when on the WAMP backend. Now about Android

iOS, . Android, , . , , , Android - , .





, , : WAMP, .





- @antoha-gs, - , .





WAMP. , WebSocket PubSub RPC . WAMP โ€” , Java/Kotlin, .





. , , , . โ€” , . - .





, , Sendbird โ€” , . . Sendbird .





:





ยซ, Socket.IO, : 1) . 2) . WAMP โ€” โ€” . - MQTTยป.





, , . , / (at most/at least/exactly), . , , at most once, WAMP . .





MQTT โ€” , , , WAMP, . XMPP (aka Jabber), , MQTT WAMP, , ยซยป . , , , , .





, , , . .





-

, WAMP .





  • . .





  • (PUBLISH โ€” 16, SUBSCRIBE โ€” 32 ). QA ( , [33,11,5862354]).





  • (, ) id . - . ( id ):client โ†’   [32,18,{},"co.fun.chat.testChatId"]backend โ†’ [33,18,5868752 (id )]client โ†’ id [34,20,5868752]





OkHttp (, , , ping-pong ) RxJava, โ€” event-based programming, Rx, , .





, WAMP- OkHttpClient: 





val request = Request.Builder()
    .url(ChatsConfig.SOCKETURL)
    .addHeader("Connection", "Upgrade")
    .addHeader("Sec-WebSocket-Protocol", "wamp.json")
    .addHeader("Authorization", authToken)
    .build()
val listener = ChatWebSocketListener()
webSocket = okHttpClient.newWebSocket(request, listener)
      
      



ChatWebSocketListener:





private inner class ChatWebSocketListener : WebSocketListener() {

override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
 connectionStatusSubject.onNext(ChatConnectionStatuses.NOTCONNECTED) 
//subject,      ( UI    , -   )
}

override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
 webSocket.close(1000, null)
}

override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
 onConnectionError("${t.message} ${response?.body}")
}

override fun onMessage(webSocket: WebSocket, text: String) {
 socketMessagesSubject.onNext(serverMessageFactory.processMessage(text)) //subject,     ,        (. )
}

override fun onOpen(webSocket: WebSocket, response: Response) {
 authorize()
 }
}
      
      



, String, JSON, WAMP :





[ResultCode: Int, RequestId: Long, ArgumentsMap: JsonObject ]
      
      



:





[50, 7, {"type":100, "chats":[ ]}]
      
      



Gson. data- :





@DontObfuscate
data class ChatListResponse(@SerializedName("chats") val chatList: List<Chat>)
      
      



:





private fun chatListUpdateInternal(jsonChatsResponse: JSONObject):
ChatsListUpdatesEvent {
 return gson.fromJson(jsonChatsResponse.toString(), 
ChatsListUpdatesEvent::class.java)
}
      
      



. WAMP : 





sealed class WampMessage {
 class BaseMessage(val wampId: Int, val seq: Long, val jsonData: JSONArray) : WampMessage() 

 class ErrorMessage(val procedureId: Int, val seq: Long, val jsonData: JSONArray) : WampMessage()

 object WelcomeMessage : WampMessage()
 class AbortMessage(val jsonData: JSONArray) : WampMessage()
}
      
      



:





fun getCallMessage(rpc: String,
         options: Map<String, Any> = emptyMap(),
         arguments: List<Any?> = emptyList(),
         argumentsDict: Map<String, Any?> = emptyMap()):
WampMessage.BaseMessage {
 //[CALL, Request|id, Options|dict, Procedure|uri, Arguments|list]
 val seq = nextSeq.getAndIncrement()
 return WampMessage.BaseMessage(WAMP.MessageIds.CALL,
               seq,
               JSONArray(listOfNotNull(WAMP.MessageIds.CALL,
               seq,
               options,
               rpc,
               arguments,
               argumentsDict)))
}
      
      







val messages: Observable<WampMessage> = socketMessagesSubject

fun sendMessage(msgToSend: WampMessage.BaseMessage): 
Observable<WampMessage> {
 return messages.filter {
   it is WampMessage.BaseMessage && it.seq == msgToSend.seq
}
    .take(1)
    .doOnSubscribe {
     webSocket.send(msgToSend.jsonData.toString())
    }
}
      
      



WAMP seq, , .





:





companion object {
 private val nextSeq: AtomicLong = AtomicLong(1)
}
fun getNextSeq() = nextSeq.getAndIncrement()
      
      



WAMP Subscriptions 

WAMP โ€” , () - , . :





  • ;





  • ;





  • - ;





  • ;





  • (, );





  • .





:





[SUBSCRIBE: Int, RequestId: Long, Options: Map, Topic: String]
      
      



topic โ€” , . 





:





fun getSubscribeMessage(topic: String, options: Map<String, Any> = emptyMap()): 
WampMessage.BaseMessage {
 val seq = nextSeq.getAndIncrement()
 return WampMessage.BaseMessage(WAMP.MessageIds.SUBSCRIBE,
               								  seq,
              								  JSONArray(listOfNotNull(WAMP.MessageIds.SUBSCRIBE,
                                seq,
                                options,
                                topic)))
}
      
      



, (, ), . WAMP: subscribe- id , , โ€” id, .





API , :





private val subscriptionsMap = ArrayMap<String, Long>()

private fun getBaseSubscription(topic: String): Observable<WampMessage> {
 val msg = wampClientMessageFactory.getSubscribeMessage(topic)
 return send(msg).map {
   val subscriptionId = converter.getSubscriptionId((it.asBaseMessage()).jsonData)
   subscriptionsMap[topic] = subscriptionId
   subscriptionId
}
    .switchMap { subscriptionId ->
      chatClient.messages.filter {
       it.isMessageFromSubscription(subscriptionId)
     }
    }
}
      
      



id, , :





fun unsubscribeFromTopic(topic: String) {
 if (!subscriptionsMap.contains(topic)) {
    return
 }
 val msg = 
wampClientMessageFactory.getUnsubscribeMessage(subscriptionsMap[topic])
 send(msg, true).exSubscribe()
 subscriptionsMap.remove(topic)
}
      
      



, Android, โ€” . , iOS , .








All Articles