Concurrent access and referential transparency
For future students on the "Scala-developer "
course, a translation of the material was prepared. We also invite you to a webinar on Effects in Scala . In the lesson, we will consider the concept of effect and the complexity that may arise if they are present. We will also introduce the concept of a functional effect, consider its properties and implement our own small functional effect. Join us.
* Concurrency - concurrency, allowing the simultaneous execution of several computational processes.
Ref Deferred FP, , concurrent. c tagless final ( ) , , -, , : (concurrent access) (referential transparency), , counters () state machines ( ).
, Ref Deferred, , concurrency Cats Java AtomicReference
, .
Atomic Reference
AtomicReference
— java.util.concurrent.atomic
. Oracle docs , java.util.concurrent.atomic
— :
, « » . ,
volatile
, ,atomic
…
AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference ( ).
AtomicReference
Java 1.5 , ( ).
(threads), . int: i = i + 1
. 3 , i
, 1
, i
. , , thread 3 thread, i
.
synchronised
lock
, atomic.*
, atomic () , .
, AtomicInteger.incrementAndGet
:
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
compareAndSet
, , thread . , compareAndSet
incrementAndGet
, , get()
. , (statement), «» , thread , .
, , - concurrency.
Ref
Ref
Cats atomic () Java. , Ref
tagless final F
. , , Ref
— A
, (immutable).
abstract class Ref[F[_], A] {
def get: F[A]
def set(a: A): F[Unit]
def modify[B](f: A => (A, B)): F[B]
// ... and more
}
Ref[F[_], A]
— (mutable) :
Concurrent ( )
Lock free ( “ ”)
,
F
, , cats.effect.IO
.
Cats Ref
, , F
, Sync
.
def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] = F.delay(unsafe(a))
, Ref
; Ref
.
Sync
delay
Ref
.
Ref
— , get
, set
of
, .
get
and set
, ( Shared), threads, get
set
, , :
def modifyShared(trace: Ref[IO, Shared], msg: String): IO[Unit] = {
for {
sh <- trace.get()
_ <- trace.set(Shared(sh, msg))
} yield ()
}
Shared
— Shared
, , — , , .
Shared(prev: Shared, msg: String)
.
F
IO Cats Effect, , Ref
F .
monadic
() IO flatMap
, Ref
— ... , , .
, modifyShared
, ! , , , , threads get
set
. get
set
(atomically) .
Atomic () update
, Ref
. get
set
update
.
def update(f: A => A): F[Unit]
, update
. , , get
set
, , , Ref
Int
:
for {
_ <- someRef.update(_ + 1)
curr <- someRef.get
_ <- IO { println(s"current value is $curr")}
} yield ()
modify
, modify
, , update
, , modify
.
def modify[B](f: A => (A, B)): F[B] = {
@tailrec
def spin: B = {
val c = ar.get
val (u, b) = f(c)
if (!ar.compareAndSet(c, u)) spin
else b
}
F.delay(spin)
}
, , AtomicInteger.incrementAndGet
, , Scala. , Ref
AtomicReference
.
Ref
, , , , update
/ modify
, (nondeterministically) , , . , , , , .
, Ref
, Cats Concurrent: Deferred
( ).
Deferred
Ref
, Deferred
:
«» ( )
«».
Deferred
.
abstract class Deferred[F[_], A] {
def get: F[A]
def complete(a: A): F[Unit]
}
Deferred
. get
«» Deferred
, . :
, threads ()
get
«» Deferred
.
— complete
— , «» Deferred
( IO).
, Deferred
, F
Concurrent
, , .
Deferred
, .
Scala Italy 2019 — Composable Concurrency with Ref + Deferred available at Vimeo
def consumer(done: Deferred[IO, Unit]) = for {
c <- Consumer.setup
_ <- done.complete(())
msg <- c.read
_ <- IO(println(s"Received $msg"))
} yield ()
def producer(done: Deferred[IO, Unit]) = for {
p <- Producer.setup()
_ <- done.get
msg = "Msg A"
_ <- p.write(msg)
_ <- IO(println(s"Sent $msg"))
} yield ()
def prog = for {
d <- Deferred[IO, Unit]
_ <- consumer(d).start
_ <- producer(d).start
} yield ()
producer () consumer (), , producer , consumer setup , , , producer, . Deferred
get
, done
Deferred
consumer ( Unit ()
).
, , consumer setup
, , producer
. , get
, Either[Throwable, Unit]
- Unit
Deferred
.
Deferred
, Ref
, semaphores ().
Cats, Cats concurrency , .