Fetch - a library for accessing data

Fetch is a Scala library for organizing access to data from file systems, databases, web services and any other sources, data from which can be obtained by a unique identifier. The library is written in a functional style and is based on Cats and Cats Effect. Designed for composition and optimization of queries to different data sources. It allows you to:



  • request data from multiple sources in parallel;
  • request data from one source in parallel;
  • combine requests to one source into one request;
  • deduplicate queries in each of the listed situations;
  • cache query results.


To do this, the library provides tools that allow you to write clean business code without low-level constructs to implement the listed optimizations.

The examples use the latest version of Fetch at the time of this writing - 1.3.0.



Data source in Fetch



To implement access to any source through Fetch, you need to implement:



  • description of the data source (trait Data[I, A]);
  • methods for obtaining data from the source (trait DataSource[F[_], I, A]).


DataSource[F[_], I, A] ( I β€” , - : , ID ; A β€” , F β€” ) :



/**
 * A `DataSource` is the recipe for fetching a certain identity `I`, which yields
 * results of type `A` performing an effect of type `F[_]`.
 */
trait DataSource[F[_], I, A] {
  def data: Data[I, A]

  implicit def CF: Concurrent[F]

  /** Fetch one identity, returning a None if it wasn't found.
   */
  def fetch(id: I): F[Option[A]]

  /** Fetch many identities, returning a mapping from identities to results. If an
   * identity wasn't found, it won't appear in the keys.
   */
  def batch(ids: NonEmptyList[I]): F[Map[I, A]] =
    FetchExecution.parallel(
      ids.map(id => fetch(id).tupleLeft(id))
    ).map(_.collect { case (id, Some(x)) => id -> x }.toMap)

  def maxBatchSize: Option[Int] = None

  def batchExecution: BatchExecution = InParallel
}


data: Data[I, A] β€” Data[I,A], . CF β€” «» , β€” Concurrent. fetch β€” ID. batch β€” , ID . fetch β€” ID . : , , .



Fetch:



class ListData(val list: List[String]) extends Data[Int, String] {
  override def name: String = "My List of Data"
}

class ListDataSource(list: ListData)(implicit cs: ContextShift[IO])
    extends DataSource[IO, Int, String]
    with LazyLogging {

  override def data: ListData = list

  /*implicit  Stack overflow,       */
  override def CF: Concurrent[IO] = Concurrent[IO]

  override def fetch(id: Int): IO[Option[String]] =
    CF.delay {
      logger.info(s"Processing element from index $id")
      data.list.lift(id)
    }
}


: DataSource - Data. :



class ListSource(list: List[String])(implicit cf: ContextShift[IO]) extends Data[Int, String] with LazyLogging {
  override def name: String        = "My List of Data"
  private def instance: ListSource = this

  def source = new DataSource[IO, Int, String] {
    override def data: Data[Int, String] = instance

    override def CF: Concurrent[IO] = Concurrent[IO]

    override def fetch(id: Int): IO[Option[String]] =
      CF.delay {
        logger.info(s"Processing element from index $id")
        list.lift(id)
      }
  }
}


fetch . . Fetch , .



DataSource . Fetch. β€” - "" . . Fetch Fetch.run ( run , ). ID . , F, Concurrent[F]. :



val list                                = List("a", "b", "c")
val data: ListSource                    = new ListSource(list)
val source: DataSource[IO, Int, String] = data.source

val fetchDataPlan: Fetch[IO, String] = Fetch(1, source)
val fetchData: IO[String]            = Fetch.run(fetchDataPlan)
val dataCalculated: String           = fetchData.unsafeRunSync // b


, ID , .



:



object Example extends App {

  implicit val ec: ExecutionContext = global
  implicit val cs: ContextShift[IO] = IO.contextShift(ec)  //  Fetch.run  ListDataSource
  implicit val timer: Timer[IO]     = IO.timer(ec) //  Fetch.run

  val list = List("a", "b", "c")
  val data   = new ListSource(list)
  val source = data.source

  Fetch.run(Fetch(0, source)).unsafeRunSync  
  // INFO ListDataSource - Processing element from index 0

  Fetch.run(Fetch(1, source)).unsafeRunSync  
  // INFO ListDataSource - Processing element from index 1

  Fetch.run(Fetch(2, source)).unsafeRunSync  
  // INFO ListDataSource - Processing element from index 2

  Fetch.run(Fetch(3, source)).unsafeRunSync  
  // INFO ListDataSource - Processing element from index 3
  // Exception in thread "main" fetch.package$MissingIdentity
}


, data.list.lift(id) fetch . , fetch None. , Option Option : DataSource[F[_], I, A]. Option , Fetch apply, optional:



Fetch.run(Fetch.optional(3, source)).unsafeRunSync  // None


, :



val fApply: Fetch[IO, String]            = Fetch(3, source)
val fOptional: Fetch[IO, Option[String]] = Fetch.optional(3, source)


- Data , Fetch. , , optional:



def fetchElem(id: Int) = Fetch.optional(id, source)


fetchElem ListSource:



Fetch.run(data.fetchElem(0)).unsafeRunSync // INFO app.ListDataSource - Processing element from index 0
Fetch.run(data.fetchElem(1)).unsafeRunSync // INFO app.ListDataSource - Processing element from index 1
Fetch.run(data.fetchElem(2)).unsafeRunSync // INFO app.ListDataSource - Processing element from index 2
println(Fetch.run(data.fetchElem(2)).unsafeRunSync) // Some(c)
println(Fetch.run(data.fetchElem(3)).unsafeRunSync) // None




Fetch Β« Β»:



def fetch(id: Int): Option[String] = {
  val run   = Fetch.run(data.fetchElem(id))
  run.unsafeRunSync
}

fetch(1)  // INFO app.ListDataSource - Processing element from index 1
fetch(1)  // INFO app.ListDataSource - Processing element from index 1
fetch(1)  // INFO app.ListDataSource - Processing element from index 1


Fetch DataCache[F[_]]. β€” InMemoryCache[F[_]: Monad](state: Map[(Data[Any, Any], DataSourceId), DataSourceResult]). - from β€” ; empty β€” :



def from[F[_]: Monad, I, A](results: ((Data[I, A], I), A)*): InMemoryCache[F] 
def empty[F[_]: Monad]: InMemoryCache[F]


, , Map[(Data[Any, Any], DataSourceId), DataSourceResult]. :



final class DataSourceId(val id: Any)         extends AnyVal
final class DataSourceResult(val result: Any) extends AnyVal


, β€” (Data[Any, Any], DataSourceId). ID . β€” DataSourceResult. . , Fetch . , β€” Any . . InMemoryCache :



def lookup[I, A](i: I, data: Data[I, A]): F[Option[A]] =
  Applicative[F].pure(
    state
      .get((data.asInstanceOf[Data[Any, Any]], new DataSourceId(i)))
      .map(_.result.asInstanceOf[A])
  )


, Data β€” . Data A, asInstanceOf[A] Any . Map updated, .



, , Map Scala β€” . - .



from:



val cacheF: DataCache[IO] = InMemoryCache.from((data, 1) -> "b", (data, 2) -> "c")

Fetch.run(data.fetchElem(1), cacheF).unsafeRunSync  //   
Fetch.run(data.fetchElem(1), cacheF).unsafeRunSync  //   
Fetch.run(data.fetchElem(1), cacheF).unsafeRunSync
Fetch.run(data.fetchElem(1), cacheF).unsafeRunSync
Fetch.run(data.fetchElem(0), cacheF).unsafeRunSync
Fetch.run(data.fetchElem(0), cacheF).unsafeRunSync

// INFO app.ListDataSource - Processing element from index 0
// INFO app.ListDataSource - Processing element from index 0


, , . , - β€” Map . , . Fetch.runCache, ( , ):



var cache: DataCache[IO] = InMemoryCache.empty

def cachedRun(id: Int): Option[String] = {
  val (c, r) = Fetch.runCache(Fetch.optional(id, source), cache).unsafeRunSync
  cache = c  //    
  r
}

cachedRun(1)
cachedRun(1)
cachedRun(2)
cachedRun(2)
cachedRun(4)
cachedRun(4)

// INFO app.ListDataSource - Processing element from index 1
// INFO app.ListDataSource - Processing element from index 2
// INFO app.ListDataSource - Processing element from index 4
// INFO app.ListDataSource - Processing element from index 4


, . β€” .



: Caffeine



DataCache. Fetch. DataCache Java- Caffeine, β€” Scala Scaffeine:



class ScaffeineCache extends DataCache[IO] with LazyLogging {

  private val cache =
    Scaffeine()
      .recordStats()
      .expireAfterWrite(1.hour)
      .maximumSize(500)
      .build[(Data[Any, Any], Any), Any]()

  override def lookup[I, A](i: I, data: Data[I, A]): IO[Option[A]] = IO {
    cache
      .getIfPresent(data.asInstanceOf[Data[Any, Any]] -> i)
      .map { any =>
        val correct = any.asInstanceOf[A]
        logger.info(s"From cache: $i")
        correct
      }
  }

  override def insert[I, A](i: I, v: A, data: Data[I, A]): IO[DataCache[IO]] = {
    cache.put(data.asInstanceOf[Data[Any, Any]] -> i, v) // Unit
    IO(this)
  }

}


, InMemoryCache. Scaffeine β€” Any: build[(Data[Any, Any], Any), Any](). asInstanceOf:



val list  = List("a", "b", "c")
val listSource  = new ListSource(list)
val source = listSource.source
val randomSource = new RandomSource()
val cache = new ScaffeineCache()

/**   */
Fetch.run(Fetch(1, source)).unsafeRunSync // Processing element from index 1
Fetch.run(Fetch(1, source)).unsafeRunSync // Processing element from index 1

println()

/**   */
Fetch.run(Fetch(1, source), cache).unsafeRunSync // Processing element from index 1
Fetch.run(Fetch(1, source), cache).unsafeRunSync // From cache: 1
Fetch.run(Fetch("a", source), cache).unsafeRunSync // type mismatch

/**      */
Fetch.run(randomSource.fetchInt(2), cache).unsafeRunSync  // Getting next random by max 2
Fetch.run(randomSource.fetchInt(2), cache).unsafeRunSync  // From cache: 2


:



  • ID ( asInstanceOf) type mismatch Fetch ID Source, ;
  • ;
  • Caffeine, β€” . , DataCache insert.




Fetch Scala Cats. β€” List[Fetch[_,_]] Fetch[_, List[_]], Fetch.run. Fetch: , , , . Fetch , Fetch Fetch.run β€” , .



. :



val t1: IO[(String, String)] = for {
  a <- Fetch.run(Fetch(1, source))
  b <- Fetch.run(Fetch(1, source))
} yield (a, b)


for : Fetch[_, List[_]] , Fetch run , . for :



val f1: Fetch[IO, (String, String)] = for {
  a <- Fetch(1, source)
  b <- Fetch(1, source)
} yield (a,b)

val t2: IO[(String, String)] = Fetch.run(f1)


Fetch . , Fetch.run :



  • ;
  • β€” ;
  • β€” .


DataSource batch .



Cats. , sequence List[F[_]] F[List[_]]. traverse : List[] `F[List[]]`. , tupled :



val f3: List[Fetch[IO, String]] = List(
  Fetch(1, source),
  Fetch(2, source),
  Fetch(2, source)
)

val f31: Fetch[IO, List[String]] = f3.sequence
val t3: IO[List[String]]         = Fetch.run(f31)

val f4: List[Int] = List(
  1,
  2,
  2
)

val f41: Fetch[IO, List[String]] = f4.traverse(Fetch(_, source))
val t4: IO[List[String]]         = Fetch.run(f41)

val f5: (Fetch[IO, String], Fetch[IO, String]) = (Fetch(1, source), Fetch(2, source))
val f51: Fetch[IO, (String, String)]           = f5.tupled
val t5: IO[(String, String)]                   = Fetch.run(f51)

val f6: (Int, Int)                = (1, 2)
val f61: Fetch[IO, (Int, String)] = f6.traverse(Fetch(_, source))


flatMap:



val f0: Fetch[IO, String]  = Fetch(1, source).flatMap(_ => Fetch(1, source))
val t0: IO[String]         = Fetch.run(f0)


, , Fetch . Cats. Fetch β€” , . , , .



(Batching)



Fetch . :



import fetch.fetchM  //  Fetch   Cats

val tuple: Fetch[IO, (Option[String], Option[String])] = (data.fetchElem(0), data.fetchElem(1)).tupled

Fetch.run(tuple).unsafeRunSync()  // (Some(a),Some(b))


batch DataSource fetch . . , SQL- .



batch ListSource:



override def batch(ids: NonEmptyList[Int]): IO[Map[Int, String]] = {
  logger.info(s"IDs fetching in batch: $ids")
  super.batch(ids)
}


traverse:



import fetch.fetchM 

def findMany: Fetch[IO, List[Option[String]]] =
  List(0, 1, 2, 3, 4, 5).traverse(data.fetchElem)

Fetch.run(findMany).unsafeRunSync
// INFO app.ListSource - IDs fetching in batch: NonEmptyList(0, 5, 1, 2, 3, 4)


, maxBatchSize:



override def maxBatchSize: Option[Int] = 2.some  // defaults to None

// INFO app.ListSource - IDs fetching in batch: NonEmptyList(0, 5)
// INFO app.ListSource - IDs fetching in batch: NonEmptyList(1, 2)
// INFO app.ListSource - IDs fetching in batch: NonEmptyList(3, 4)


, . batchExecution:



override def batchExecution: BatchExecution = Sequentially // defaults to `InParallel`




Fetch.run. , , Fetch .



, , - . ID, . Fetch , :



class RandomSource(implicit cf: ContextShift[IO]) extends Data[Int, Int] with LazyLogging {

  override def name: String          = "Random numbers generator"
  private def instance: RandomSource = this

  def source: DataSource[IO, Int, Int] = new DataSource[IO, Int, Int] {
    override def data: Data[Int, Int] = instance

    override def CF: Concurrent[IO] = Concurrent[IO]

    override def fetch(max: Int): IO[Option[Int]] =
      CF.delay {
        logger.info(s"Getting next random by max $max")
        scala.util.Random.nextInt(max).some
      }
  }
}


Fetch listSource:



val listSource = new ListSource(List("a", "b", "c"))
val randomSource = new RandomSource()

def fetchMulti: Fetch[IO, (Int, String)] =
  for {
    rnd <- Fetch(3, randomSource.source)  // Fetch[IO, Int]
    char <- Fetch(rnd, listSource.source)  // Fetch[IO, String]
  } yield (rnd, char)

println(Fetch.run(fetchMulti).unsafeRunSync)  // , (0,a)


, . : , , -, . - .





, . , for flatMap , . sequence traverse . , , sequence traverse . tupled, . , .





, , . β€” ID , . .



:



val list = List("a", "b", "c", "d", "e", "f", "g", "h", "i")
val data = new ListSource(list, 2.some)

val tupleD: Fetch[IO, (Option[String], Option[String])] = (data.fetchElem(0), data.fetchElem(0)).tupled
Fetch.run(tupleD).unsafeRunSync()
//INFO app.sources.ListSource - Processing element from index 0
//(Some(a),Some(a))


:



def fetchMultiD: Fetch[IO, (Int, String, Int, String)] =
  for {
    rnd1 <- Fetch(3, randomSource.source)  // Fetch[IO, Int]
    char1 <- Fetch(rnd1, listSource.source)  // Fetch[IO, String]
    rnd2 <- Fetch(3, randomSource.source)  // Fetch[IO, Int]
    char2 <- Fetch(rnd2, listSource.source)  // Fetch[IO, String]
  } yield (rnd1, char1, rnd2, char2)

println(Fetch.run(fetchMultiD).unsafeRunSync)
//18:43:11.875 [scala-execution-context-global-14] INFO app.sources.RandomSource - Getting next random by max 3
//18:43:11.876 [scala-execution-context-global-13] INFO app.sources.ListSource - Processing element from index 1
//(1,b,1,b)




Fetch :



  • FetchException;
  • MissingIdentity ID;
  • UnhandledException .


Fetch Option, , Either:



// val i: String = Fetch.run(Fetch(5, data.source)).unsafeRunSync // Exception in thread "main" fetch.package$MissingIdentity

val i: Either[Throwable, String] = Fetch.run(Fetch(5, data.source)).attempt.unsafeRunSync // Left(fetch.package$MissingIdentity)


Fetch



Fetch Fetch.runLog, FetchLog . fetch-debug describe, Throwable Log.



throwable:



// libraryDependencies += "com.47deg" %% "fetch-debug" % "1.3.0"
import fetch.debug.describe
val t: Either[Throwable, (Log, String)] = Fetch.runLog(Fetch(5, data.source)).attempt.unsafeRunSync
println(t.fold(describe, identity))
// [ERROR] Identity with id `5` for data source `My List of Data` not found, fetch interrupted after 1 rounds
// Fetch execution 0.00 seconds
//
//    [Round 1] 0.00 seconds
//      [Fetch one] From `My List of Data` with id 5 0.00 seconds


Fetch (>> Cats flatMap(_ => ...)):



object DebugExample extends App with ContextEntities {
  val list = List("a", "b", "c", "d", "e", "f", "g", "h")

  val listData                                = new ListSource(list)
  val listSource: DataSource[IO, Int, String] = listData.source
  val randomSource                            = new RandomSource().source

  val cacheF: DataCache[IO] = InMemoryCache.from((listData, 1) -> "b")

  //    
  val cached = Fetch(1, listSource)

  //  #1,   
  val notCached = Fetch(2, listSource)

  // #2
  val random = Fetch(10, randomSource)

  // #3
  val batched: Fetch[IO, (String, String)] = (Fetch(3, listSource), Fetch(4, listSource)).tupled

  // #4
  val combined = (Fetch(5, listSource), Fetch(150, randomSource)).tupled

  /** End of fetches */
  val complicatedFetch: Fetch[IO, (String, Int)] = cached >> notCached >> random >> notCached >> batched >> combined
  val result: IO[(Log, (String, Int))]           = Fetch.runLog(complicatedFetch, cacheF)
  val tuple: (Log, (String, Int))                = result.unsafeRunSync()

  println(tuple._2) // (f,17)
  println(describe(tuple._1))
  println(tuple._1)

  //Fetch execution 0.11 seconds
  //
  //  [Round 1] 0.06 seconds
  //    [Fetch one] From `My List of Data` with id 2 0.06 seconds
  //  [Round 2] 0.00 seconds
  //    [Fetch one] From `Random numbers generator` with id 10 0.00 seconds
  //  [Round 3]  0.01 seconds
  //    [Batch] From `My List of Data` with ids List(3, 4)  0.01 seconds
  //  [Round 4]  0.00 seconds
  //    [Fetch one] From `Random numbers generator` with id 150  0.00 seconds
  //    [Fetch one] From `My List of Data` with id 5  0.00 seconds

  // raw:
  // FetchLog(Queue(Round(List(Request(FetchOne(2,app.sources.ListSource@ea6147e),10767139,10767203))), Round(List(Request(FetchOne(10,app.sources.RandomSource@58b31054),10767211,10767213))), Round(List(Request(Batch(NonEmptyList(3, 4),app.sources.ListSource@ea6147e),10767234,10767242))), Round(List(Request(FetchOne(150,app.sources.RandomSource@58b31054),10767252,10767252), Request(FetchOne(5,app.sources.ListSource@ea6147e),10767252,10767252)))))
}


:



  • cached , ;
  • notCached . C ;
  • batch ;
  • .


. >>, . 3 4 tupled, . β€” , . β€” , .



:



, , . , :



  • ID - ;
  • ;
  • ID;
  • .


( ):



/*Model*/

type DocumentId = String
type PersonId = String

case class FtsResponse(ids: List[DocumentId])

case class SimilarityItem(id: DocumentId, similarity: Double)

case class DocumentInfo(id: DocumentId, info: String, authors: List[PersonId])

case class Person(id: PersonId, fullTitle: String)

/*Response*/
case class DocumentSearchResponse(
    items: List[DocumentSearchItem]
)

case class DocumentItem(id: DocumentId, info: Option[String], authors: List[Person])

case class DocumentSimilarItem(
    item: DocumentItem,
    similarity: Double
)

case class DocumentSearchItem(
    item: DocumentItem,
    similar: List[DocumentSimilarItem]
)

class DocumentSearchExample(
    fts: Fts[IO],
    documentInfoRepo: DocumentInfoRepo[IO],
    vectorSearch: VectorSearch[IO],
    personRepo: PersonRepo[IO]
)(
    implicit cs: ContextShift[IO]
) {

  val infoSource    = new DocumentInfoSource(documentInfoRepo, 16.some)
  val personSource  = new PersonSource(personRepo, 16.some)
  val similarSource = new SimilarDocumentSource(vectorSearch, 16.some)

  def documentItemFetch(id: DocumentId): Fetch[IO, DocumentItem] =
    for {
      infoOpt <- infoSource.fetchElem(id)
      p       <- infoOpt.traverse(i => i.authors.traverse(personSource.fetchElem).map(_.flatten))
    } yield DocumentItem(id, infoOpt.map(_.info), p.getOrElse(List.empty[Person]))

  def fetchSimilarItems(id: DocumentId): Fetch[IO, List[DocumentSimilarItem]] =
    similarSource
      .fetchElem(id)
      .map(_.getOrElse(List.empty[SimilarityItem]))
      .flatMap {
        _.traverse { si =>
          documentItemFetch(si.id).map { di =>
            DocumentSimilarItem(di, si.similarity)
          }
        }
      }

  def searchDocumentFetch(query: String): Fetch[IO, DocumentSearchResponse] =
    for {
      docs <- Fetch.liftF(fts.search(query))
      items <- docs.ids.traverse { id =>
                (documentItemFetch(id), fetchSimilarItems(id)).tupled.map(r => DocumentSearchItem(r._1, r._2))
              }
    } yield DocumentSearchResponse(items)

}


DocumentSearchExample . , . , Doobie. Fetch β€” DocumentInfoSource, PersonSource SimilarDocumentSource, .



searchDocumentFetch query. , , Fetch. search:



def search(query: String): F[FtsResponse]


liftF Fetch[F, FtsResponse], for Fetch.



ID ID . Fetch tupled, , . , DocumentSearchItem.



fetchSimilarItems ID similarSource, β€” documentItemFetch. . DocumentInfo ID, , .



Fetch[IO, Fetch[...]] map flatMap:



similarSource
  .fetchElem(id)
  .map(_.getOrElse(List.empty[SimilarityItem]))  // Fetch
  .flatMap { 
    _.traverse { si =>
      documentItemFetch(si.id).map { di =>  // Fetch
        DocumentSimilarItem(di, si.similarity)
      }
    }
  }


:



private val docInfo = Map(
  "1" -> DocumentInfo("1", "Document 1", List(1)),
  "2" -> DocumentInfo("2", "Document 2", List(1,2)),
  "3" -> DocumentInfo("3", "Document 3", List(3,1)),
  "4" -> DocumentInfo("4", "Document 4", List(2,1)),
  "5" -> DocumentInfo("5", "Document 5", List(2)),
  "6" -> DocumentInfo("6", "Document 6", List(1,3))
)

private val similars = Map(
  "1" -> List(SimilarityItem("2", 0.7), SimilarityItem("3", 0.6)),
  "2" -> List(SimilarityItem("1", 0.7)),
  "3" -> List(SimilarityItem("1", 0.6)),
  "4" -> List(),
  "5" -> List(SimilarityItem("6", 0.5)),
  "6" -> List(SimilarityItem("5", 0.5))
)

private val persons = Map(
  1 -> Person(1, "Rick Deckard"),
  2 -> Person(2, "Roy Batty"),
  3 -> Person(3, "Joe")
)


, . , :



INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 2. It is: Some(List(1))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 4. It is: None
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 3. It is: Some(List(1))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 1. It is: Some(List(2, 3))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 6. It is: Some(List(5))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 5. It is: Some(List(6))
INFO app.searchfetchproto.source.DocumentInfoSource - Document IDs fetching in batch: NonEmptyList(4, 5, 2, 3, 6, 1)
INFO app.searchfetchproto.source.PersonSource - Person IDs fetching in batch: NonEmptyList(1, 2, 3)


, , batch:



[Round 1]  0.12 seconds
    [Batch] From `Similar Document Source` with ids List(4, 5, 2, 3, 6, 1) 0.06 seconds
    [Batch] From `Document Info Source` with ids List(4, 5, 2, 3, 6, 1) 0.12 seconds
  [Round 2]  0.00 seconds
    [Batch] From `Persons source` with ids List(1, 2, 3)  0.00 seconds


searchDocumentFetch :



(documentItemFetch(id), fetchSimilarItems(id)).tupled


tupled. , fetchSimilarItems documentItemFetch, , :



[Batch] From `Similar Document Source` with ids List(4, 5, 2, 3, 6, 1) 0.06 seconds
[Batch] From `Document Info Source` with ids List(4, 5, 2, 3, 6, 1) 0.12 seconds


, , . . (, Thread.sleep(100) ), :



INFO app.searchfetchproto.source.DocumentInfoSource - Document IDs fetching in batch: NonEmptyList(4, 5, 2, 3, 6, 1)
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 4. It is: None
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 5. It is: Some(List(6))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 3. It is: Some(List(1))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 2. It is: Some(List(1))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 6. It is: Some(List(5))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 1. It is: Some(List(2, 3))
INFO app.searchfetchproto.source.DocumentInfoSource - Document IDs fetching in batch: NonEmptyList(5, 2, 3, 6, 1)

  [Round 1]  0.13 seconds
    [Batch] From `Similar Document Source` with ids List(4, 5, 2, 3, 6, 1)  0.13 seconds
    [Batch] From `Document Info Source` with ids List(4, 5, 2, 3, 6, 1)  0.08 seconds
  [Round 2]  0.00 seconds
    [Batch] From `Document Info Source` with ids List(5, 2, 3, 6, 1)  0.00 seconds
    [Batch] From `Persons source` with ids List(1, 2, 3)  0.00 seconds


, fetchSimilarItems. , .





Fetch . , , , . Cats, Fetch Scala Doobie fs2.





  • ZQuery β€” , , ZIO Cats;
  • Clump β€” , Fetch, 2015;
  • Haxl β€” Haskell.


ZQuery Fetch β€” β€œThere is no Fork: an Abstraction for Efficient, Concurrent, and Concise Data Access” (https://simonmar.github.io/bib/papers/haxl-icfp14.pdf), ( ) .








All Articles