Cat Concurrency Basics with Ref and Deferred

Concurrent access and referential transparency

For future students on the "Scala-developer "



course, a translation of the material was prepared. We also invite you to a webinar on Effects in Scala . In the lesson, we will consider the concept of effect and the complexity that may arise if they are present. We will also introduce the concept of a functional effect, consider its properties and implement our own small functional effect. Join us.






* Concurrency - concurrency, allowing the simultaneous execution of several computational processes.





Ref Deferred FP, , concurrent. c tagless final ( ) , , -, , : (concurrent access) (referential transparency), , counters () state machines ( ).





, Ref Deferred, , concurrency Cats Java AtomicReference



, .





Atomic Reference

AtomicReference



java.util.concurrent.atomic



. Oracle docs ,  java.util.concurrent.atomic



— :





, « » . , volatile



, , atomic







AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference ( ).





AtomicReference



Java 1.5 , ( ).





(threads), . int: i = i + 1



. 3 , i



, 1



, i



. , , thread 3 thread, i



.





synchronised



lock



, atomic.*



, atomic () , .





, AtomicInteger.incrementAndGet



:





/**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }
      
      



compareAndSet



, , thread . , compareAndSet



  incrementAndGet



, , get()



. , (statement), «» , thread , .





, , - concurrency.





Ref

Ref



Cats atomic () Java. , Ref



tagless final F



. , , Ref



A



, (immutable).





abstract class Ref[F[_], A] {
  def get: F[A]
  def set(a: A): F[Unit]
  def modify[B](f: A => (A, B)): F[B]
  // ... and more
}
      
      



Ref[F[_], A]



— (mutable) :





  • Concurrent ( )





  • Lock free ( “ ”)









,

F



, , cats.effect.IO



.





Cats Ref



, , F



, Sync



.





def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] = F.delay(unsafe(a))
      
      



, Ref



; Ref







Sync

delay



  Ref



.





Ref



— ,  get



, set



of



, .





get



and set



 





, ( Shared), threads, get



set



, , :





def modifyShared(trace: Ref[IO, Shared], msg: String): IO[Unit] = {
	for {
		sh <- trace.get()
		_ <- trace.set(Shared(sh, msg))
	} yield ()
}
      
      



Shared



Shared



, , — , , . 





Shared(prev: Shared, msg: String)



.





F



IO Cats Effect, , Ref



F .





 monadic



() IO flatMap



, Ref



— ... , , .





, modifyShared



, ! , , , , threads get



set



. get



set



(atomically) .





Atomic () update







, Ref



. get



set



update



.





def update(f: A => A): F[Unit] 
      
      



, update



. , , get



set



, , , Ref



Int



:





for {
		_ <- someRef.update(_ + 1)
		curr <- someRef.get
		_ <- IO { println(s"current value is $curr")}
	} yield ()
      
      



modify



 





, modify



, , update



, ,  modify



.





def modify[B](f: A => (A, B)): F[B] = {
      @tailrec
      def spin: B = {
        val c = ar.get
        val (u, b) = f(c)
        if (!ar.compareAndSet(c, u)) spin
        else b
      }
      F.delay(spin)
    }
      
      



, , AtomicInteger.incrementAndGet



, , Scala. , Ref



AtomicReference



.





Ref

, , , , update



/ modify



, (nondeterministically) , , . , , , , .





, Ref



, Cats Concurrent: Deferred



( ).





Deferred

Ref



, Deferred



:





  • «» ( )









  • «».





Deferred



.





abstract class Deferred[F[_], A] {
  def get: F[A]
  def complete(a: A): F[Unit]
}
      
      



Deferred



. get



«» Deferred



, .





  • , threads ()





get



«» Deferred



.





complete



— , «» Deferred



( IO).





, Deferred



, F



Concurrent



, , .





Deferred



, . 





Scala Italy 2019 — Composable Concurrency with Ref + Deferred available at Vimeo





def consumer(done: Deferred[IO, Unit]) = for {
	c <- Consumer.setup
	_ <- done.complete(())
	msg <- c.read
	_ <- IO(println(s"Received $msg"))
} yield ()

def producer(done: Deferred[IO, Unit]) = for {
	p <- Producer.setup()
	_ <- done.get
	msg = "Msg A"
	_ <- p.write(msg)
	_ <- IO(println(s"Sent $msg"))
} yield ()

def prog = for {
  d <- Deferred[IO, Unit]
  _ <- consumer(d).start
  _ <- producer(d).start
} yield ()
      
      



producer () consumer (), , producer , consumer setup , , , producer, . Deferred



get



, done



Deferred



consumer ( Unit ()



).





, , consumer setup



, , producer



. , get



, Either[Throwable, Unit]



- Unit



Deferred



.





Deferred



, Ref



, semaphores ().





Cats, Cats concurrency , .






«Scala-».





« Scala».








All Articles