Learning Scala: Part 4 - WebSocket



Hello, Habr! This time I tried to make a simple chat via Websockets. For details, welcome under cat.



Content





Links



  1. Source codes
  2. Docker images
  3. Tapir
  4. Http4s
  5. Fs2
  6. Doobie
  7. ScalaTest
  8. ScalaCheck
  9. ScalaTestPlusScalaCheck


Actually all the code is in one ChatHub object



class ChatHub[F[_]] private(
                             val topic: Topic[F, WebSocketFrame],
                             private val ref: Ref[F, Int]
                           )
                           (
                             implicit concurrent: Concurrent[F],
                             timer: Timer[F]
                           ) extends Http4sDsl[F] {

  val endpointWs: ServerEndpoint[String, Unit, String, Stream[IO, WebSocketFrame], IO] = endpoint
    .get
    .in("chat")
    .tag("WebSockets")
    .summary("   .    : ws://localhost:8080/chat")
    .description("   ")
    .in(
      stringBody
        .description("      ")
        .example("!")
    )
    .out(
      stringBody
        .description("  -   ")
        .example("6 :     Id  f518a53d: !")
    )
    //    . 
    .serverLogic(_ => IO(Left(()): Either[Unit, String]))

  def routeWs: HttpRoutes[F] = {
    HttpRoutes.of[F] {
      case GET -> Root / "chat" => logic()
    }
  }

  private def logic(): F[Response[F]] = {
    val toClient: Stream[F, WebSocketFrame] =
      topic.subscribe(1000)
    val fromClient: Pipe[F, WebSocketFrame, Unit] =
      handle
    WebSocketBuilder[F].build(toClient, fromClient)
  }

  private def handle(s: Stream[F, WebSocketFrame]): Stream[F, Unit] = s
    .collect({
      case WebSocketFrame.Text(text, _) => text
    })
    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
    .through(topic.publish)
}

object ChatHub {

  def apply[F[_]]()(implicit concurrent: Concurrent[F], timer: Timer[F]): F[ChatHub[F]] = for {
    ref <- Ref.of[F, Int](0)
    topic <- Topic[F, WebSocketFrame](WebSocketFrame.Text("==="))
  } yield new ChatHub(topic, ref)
}


Here you must immediately say about Topic - a synchronization primitive from Fs2 that allows you to make a Publisher - Subscriber model, and you can have many Publishers and many Subscriber at the same time. In general, it is better to send messages to it through some kind of buffer like Queue because it has a limit on the number of messages in the queue and Publisher waits until all Subscriber receive messages in their message queue and if it is overflowed it may hang.



val topic: Topic[F, WebSocketFrame],


Here I also count the number of messages that were sent to the chat as the number of each message. Since I need to do this from different threads, I used an analogue of Atomic, which is called Ref here and guarantees the atomicity of the operation.



  private val ref: Ref[F, Int]


Processing a stream of messages from users.



  private def handle(stream: Stream[F, WebSocketFrame]): Stream[F, Unit] = 
    stream
//       . 
    .collect({
      case WebSocketFrame.Text(text, _) => text
    })
//               .
    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
//     
    .through(topic.publish)


Actually the very logic of creating a socket.



private def logic(): F[Response[F]] = {
//    .
    val toClient: Stream[F, WebSocketFrame] =
//        
      topic.subscribe(1000)
//        
    val fromClient: Pipe[F, WebSocketFrame, Unit] =
//      
      handle
//         .
    WebSocketBuilder[F].build(toClient, fromClient)
  }


We bind our socket to a route on the server (ws: // localhost: 8080 / chat)



def routeWs: HttpRoutes[F] = {
    HttpRoutes.of[F] {
      case GET -> Root / "chat" => logic()
    }
  }


Actually, that's all. Then you can start the server with this route. I still wanted to make any kind of documentation. In general, for documenting WebSocket and other event-based interaction like RabbitMQ AMPQ, there is AsynAPI, but there is nothing under Tapir, so I just made a description of the endpoint for Swagger as a GET request. Of course, he will not work. More precisely, a 501 error will be returned, but it will be displayed in Swagger



  val endpointWs: Endpoint[String, Unit, String, fs2.Stream[F, Byte]] = endpoint
    .get
    .in("chat")
    .tag("WebSockets")
    .summary("   .    : ws://localhost:8080/chat")
    .description("   ")
    .in(
      stringBody
        .description("      ")
        .example("!")
    )
    .out(
      stringBody
        .description("  -   ")
        .example("6 :     Id  f518a53d: !")
    )


In the swagger itself it looks like this.Connect







our chat to our API server



    todosController = new TodosController()
    imagesController = new ImagesController()
//   
    chatHub <- Resource.liftF(ChatHub[IO]())
    endpoints = todosController.endpoints ::: imagesController.endpoints
//     Swagger
    docs = (chatHub.endpointWs :: endpoints).toOpenAPI("The Scala Todo List", "0.0.1")
    yml: String = docs.toYaml
//      
    routes = chatHub.routeWs <+>
      endpoints.toRoutes <+>
      new SwaggerHttp4s(yml, "swagger").routes[IO]
    httpApp = Router(
      "/" -> routes
    ).orNotFound
    blazeServer <- BlazeServerBuilder[IO](serverEc)
      .bindHttp(settings.host.port, settings.host.host)
      .withHttpApp(httpApp)
      .resource


We connect to the chat with an extremely simple script.



    <script>
        const id = `f${(~~(Math.random() * 1e8)).toString(16)}`;
        const webSocket = new WebSocket('ws://localhost:8080/chat');

        webSocket.onopen = event => {
            alert('onopen ');
        };

        webSocket.onmessage = event => {
            console.log(event);
            receive(event.data);
        };

        webSocket.onclose = event => {
            alert('onclose ');
        };

        function send() {
            let text = document.getElementById("message");
            webSocket.send(`    Id  ${id}: ${text.value}`);
            text.value = '';
        }

        function receive(m) {
            let text = document.getElementById("chat");
            text.value = text.value + '\n\r' + m;
        }
    </script>


This is actually all. I hope someone who is also studying the rock will be interested in this article and maybe even useful.



All Articles