Skip to content

Commit

Permalink
nested list, map (#3)
Browse files Browse the repository at this point in the history
* map, nested list
  • Loading branch information
vlmiroshnikov authored Oct 18, 2021
1 parent ea2c4ae commit 9932314
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cats.effect.syntax.all.*
import com.aerospike.client.async.{ EventLoop, EventPolicy, NioEventLoops }
import com.aerospike.client.policy.ClientPolicy
import com.aerospike.client.{ AerospikeClient, Host }

import scala.concurrent.duration.*

trait AeroClient[F[_]]:
Expand All @@ -26,25 +27,25 @@ object AeroClient {
port: Int,
policy: Policy = Policy.default): Resource[F, AeroClient[F]] = {

val init = Async[F].blocking {
val cp = new ClientPolicy() {
timeout = 10.seconds.toSeconds.toInt
tendInterval = 10.seconds.toSeconds.toInt
eventLoops = new NioEventLoops(policy.eventPolicy, -1)
val init = for {
loops <- Resource.fromAutoCloseable(Sync[F].delay(new NioEventLoops(policy.eventPolicy, 2)))
cp <- Resource.pure {
new ClientPolicy() {
timeout = 10.seconds.toSeconds.toInt
tendInterval = 10.seconds.toSeconds.toInt
eventLoops = loops
}
}
ac <- Resource.fromAutoCloseable(
Sync[F].delay(new AerospikeClient(cp, hosts.map(h => new Host(h, port))*)))
} yield (ac, cp)

init.map { (ac, cp) =>
new AeroClient[F] {
override def run[R](func: Context[R] => Unit): F[R] =
summon[Async[F]].async_[R](cb => func(Context(ac, cp.eventLoops.next, cb)))
}
val ac = new AerospikeClient(cp, hosts.map(h => new Host(h, port))*)
(ac, cp)
}


Resource
.make[F, (AerospikeClient, ClientPolicy)](init)(rs => Async[F].blocking(rs._1.close()))
.map { (ac, cp) =>
new AeroClient[F] {
override def run[R](func: Context[R] => Unit): F[R] =
summon[Async[F]].async_[R](cb => func(Context(ac, cp.eventLoops.next, cb)))
}
}
}
}

Expand Down
32 changes: 16 additions & 16 deletions aero-core/src/main/scala/io/github/vlmiroshnikov/aero/Reads.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import cats.*
import cats.syntax.all.*
import com.aerospike.client.*
import com.aerospike.client.async.EventLoop
import com.aerospike.client.listener.{RecordListener, RecordSequenceListener}
import com.aerospike.client.listener.{ RecordListener, RecordSequenceListener }
import com.aerospike.client.policy.BatchPolicy
import com.aerospike.client.query.Statement
import io.github.vlmiroshnikov.aero.codecs.*
import io.github.vlmiroshnikov.aero.{AeroClient, DecoderMagnet, Schema}
import io.github.vlmiroshnikov.aero.codecs.{Encoder, Listeners}
import io.github.vlmiroshnikov.aero.{ AeroClient, DecoderMagnet, Schema }
import io.github.vlmiroshnikov.aero.codecs.{ Encoder, Listeners }

import scala.jdk.CollectionConverters.*
import scala.util.Try
Expand All @@ -35,17 +35,17 @@ def get[F[_], K](
}

def batch[F[_], K](
keys: List[K],
magnet: DecoderMagnet
)(using
ac: AeroClient[F],
keyEncoder: Encoder[K],
schema: Schema): F[List[magnet.Repr]] = {
keys: List[K],
magnet: DecoderMagnet
)(using
ac: AeroClient[F],
keyEncoder: Encoder[K],
schema: Schema): F[List[magnet.Repr]] = {
ac.run[List[magnet.Repr]] { ctx =>

val decoder = magnet.decoder()
val listener = Listeners.listListener(ctx.callback, decoder.decode(_))
val policy = ctx.client.batchPolicyDefault
val policy = ctx.client.batchPolicyDefault

def mkKey(key: K) = new Key(schema.namespace, schema.set, keyEncoder.encode(key))

Expand All @@ -56,11 +56,12 @@ def batch[F[_], K](
}
}


def exists[F[_], K](key: K)(using
ac: AeroClient[F],
keyEncoder: Encoder[K],
schema: Schema): F[Boolean] = {
def exists[F[_], K](
key: K
)(using
ac: AeroClient[F],
keyEncoder: Encoder[K],
schema: Schema): F[Boolean] = {
ac.run[Boolean] { ctx =>

val listener = Listeners.existsListener(ctx.callback)
Expand All @@ -73,7 +74,6 @@ def exists[F[_], K](key: K)(using
}
}


def query[F[_], R](
stm: Statement,
magnet: DecoderMagnet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import com.aerospike.client.cdt.MapOperation
import com.aerospike.client.listener.RecordListener
import com.aerospike.client.policy.WritePolicy
import io.github.vlmiroshnikov.aero.codecs.*
import io.github.vlmiroshnikov.aero.{AeroClient, DecoderMagnet, Schema}
import io.github.vlmiroshnikov.aero.codecs.{Encoder, Listeners, RecordEncoder}
import io.github.vlmiroshnikov.aero.{ AeroClient, DecoderMagnet, Schema }
import io.github.vlmiroshnikov.aero.codecs.{ Encoder, Listeners, RecordEncoder }

def put[F[_], K, V](
key: K,
Expand All @@ -27,7 +27,7 @@ def put[F[_], K, V](
updated.expiration = fd.toSeconds.toInt
updated
}
val keyV = new Key(schema.namespace, schema.set, keyEncoder.encode(key))
val keyV = new Key(schema.namespace, schema.set, keyEncoder.encode(key))
Either
.catchNonFatal(
ctx
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.github.vlmiroshnikov.aero.codecs

import java.util.List as JList

import cats.*
import cats.data.*
import cats.syntax.all.*
Expand All @@ -17,6 +19,29 @@ type Result[T] = Either[Throwable, T]
trait Decoder[T]:
def decode(v: Record, name: String): Result[T]

trait NestedDecoder[T]:
def decode(lst: NestedValue): Result[T]

object NestedDecoder:

given NestedDecoder[String] = (v: NestedValue) =>
v match {
case v: String => v.asRight
case _ => TypeMismatchError(v.toString).asLeft
}

given NestedDecoder[Int] = (v: NestedValue) =>
v match {
case v: Int => v.asRight
case _ => TypeMismatchError(v.toString).asLeft
}

given [T <: PlainType]: NestedDecoder[List[T]] = (lst: NestedValue) =>
lst match {
case v: JList[_] => Right(v.asInstanceOf[JList[T]].asScala.toList)
case _ => TypeMismatchError(lst.toString).asLeft[List[T]]
}

object Decoder:

def instance[R](decoderF: (Record, String) => R) = new Decoder[R] {
Expand All @@ -27,14 +52,28 @@ object Decoder:
override def decode(r: Record, name: String): Result[R] = decoderF(r, name)
}

given [T <: Int | String | Double]: Decoder[List[T]] = new Decoder[List[T]] {
given [K <: PlainType, V: NestedDecoder]: Decoder[Map[K, V]] = (v: Record, name: String) => {
val dec = summon[NestedDecoder[V]]
for {
bin <- Option(v.getMap(name)).toRight(NotFoundBin(name))
lst <- Try(bin.asInstanceOf[java.util.Map[K, AnyRef]].asScala).toEither
res <- lst.toList.traverse {
case (key, nest: java.util.List[NestedValue]) => dec.decode(nest).map(v => key -> v)
case (key, plain: PlainType) => dec.decode(plain).map(v => key -> v)
}
} yield res.toMap[K, V]
}

override def decode(v: Record, name: String): Result[List[T]] = {
for {
bin <- Option(v.getList(name)).toRight(NotFoundBin(name))
lst <- Try(bin.asInstanceOf[java.util.List[T]].asScala.toList).toEither
} yield lst
}
given [T: NestedDecoder]: Decoder[List[T]] = (v: Record, name: String) => {
val dec = summon[NestedDecoder[T]]
for {
bin <- Option(v.getList(name)).toRight(NotFoundBin(name))
lst <- Try(bin.asInstanceOf[java.util.List[AnyRef]].asScala).toEither
res <- lst.toList.traverse {
case nest: java.util.List[PlainType] => dec.decode(nest)
case plain: PlainType => dec.decode(plain)
}
} yield res
}

given Decoder[Int] = instance((r, n) => r.getInt(n))
Expand All @@ -43,3 +82,7 @@ object Decoder:
case class NotFoundBin(bin: String) extends RuntimeException {
override def getMessage: String = s"Not found bin: ${bin}"
}

case class TypeMismatchError(v: String) extends RuntimeException {
override def getMessage: String = "TypeMismatchError"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,47 @@ package io.github.vlmiroshnikov.aero.codecs

import com.aerospike.client.Value
import scala.jdk.CollectionConverters.*
import java.util.List as JList

trait Encoder[T]:
def encode(t: T): Value

type PlainType = Int | Long | String | Double | Boolean
type NestedType = PlainType | List[_]

type NestedValue = PlainType | JList[_]

trait NestedEncoder[T]:
def encode(r: T): NestedValue

object NestedEncoder:

given [T <: NestedType]: NestedEncoder[T] = (r: T) => {
r match {
case v: List[_] => v.asJava
case p: PlainType => p
}
}

object Encoder:

def instance[T](f: T => Value) = new Encoder[T]:
def encode(t: T): Value = f(t)
def instance[T](f: T => Value): Encoder[T] = (t: T) => f(t)

given [T <: Int | String | Double]: Encoder[List[T]] = new Encoder[List[T]] {
given Encoder[String] = Encoder.instance(Value.get)
given Encoder[Int] = Encoder.instance(Value.get)
given Encoder[Long] = Encoder.instance(Value.get)
given Encoder[Double] = Encoder.instance(Value.get)

override def encode(lst: List[T]): Value =
Value.get(lst.asJava)
given [T: NestedEncoder]: Encoder[List[T]] = (lst: List[T]) => {
val enc = summon[NestedEncoder[T]]
val r = lst.map { v => enc.encode(v) }.asJava
Value.get(r)
}

given Encoder[String] = Encoder.instance(Value.get(_))
given Encoder[Int] = Encoder.instance(Value.get(_))
given [K <: PlainType, V: NestedEncoder]: Encoder[Map[K, V]] = (map: Map[K, V]) => {
val enc = summon[NestedEncoder[V]]
Value.get(map.map((k, v) => (k, enc.encode(v))).asJava)
}

object asValue:
def apply[V](v: V)(using encoder: Encoder[V]): Value = encoder.encode(v)
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ object Listeners {
override def onFailure(exception: AerospikeException): Unit = callback(Left(exception))
}

def existsListener(callback: Callback[Boolean]) = new ExistsListener {
override def onSuccess(key: Key, exists: Boolean): Unit = callback(exists.asRight)
def existsListener(callback: Callback[Boolean]): ExistsListener = new ExistsListener {
override def onSuccess(key: Key, exists: Boolean): Unit = callback(exists.asRight)
override def onFailure(exception: AerospikeException): Unit = callback(exception.asLeft)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ object RecordEncoder:
inline m match
case mprod: Mirror.ProductOf[A] =>
lazy val encoders = summonEncodersRec[mprod.MirroredElemTypes]
new RecordEncoder[A] {
override def encode(v: A): List[Bin] =
encodeProduct(v.asInstanceOf[Product], encoders)
}
(v: A) => encodeProduct(v.asInstanceOf[Product], encoders)

case _ => throw new RuntimeException()
}
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Settings._
import xerial.sbt.Sonatype._

val versionV = "0.0.2"
val versionV = "0.0.3"

ThisBuild / version := versionV
ThisBuild / scalaVersion := Versions.dotty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,34 @@ import com.aerospike.client.query.Statement
import io.github.vlmiroshnikov.aero.codecs.*
import io.github.vlmiroshnikov.aero.reads.*
import io.github.vlmiroshnikov.aero.writes.*
import io.github.vlmiroshnikov.aero.codecs.{RecordDecoder, RecordEncoder, asValue}
import munit.*

class IntergrationSuite extends CatsEffectSuite {

val client = ResourceFixture(AeroClient(List("10.232.123.11"), 3000))
case class Data(v: String)

case class Rec(source_sids: List[String]) derives RecordDecoder, RecordEncoder
class IntegrationSuite extends CatsEffectSuite {

given Schema("tss", "report_meta")
val client = ResourceFixture(AeroClient(List("192.168.1.35"), 3000))

client.test("get".ignore) { (ac: AeroClient[IO]) =>
//given NestedEncoder[String] = (r: String) => r
// given NestedDecoder[Data] = (lst: NestedValue) => lst match {
// case s: String => Data(s).asRight
// case s : List[String] => Data(s.tail.head).asRight
// case _ => (new Exception()).asLeft
//
// }

case class Rec(data: List[List[String]], map: Map[String, List[String]]) derives RecordEncoder, RecordDecoder

given Schema("test", "nested")

client.test("get".ignore) { ac =>
given AeroClient[IO] = ac
val rec = Rec(List("3024fe7c-e0cf-4d67-9065-5cde44297c1f", "12", "34"))
val rec = Rec(List(List("a", "b")), Map("key" -> List("1", "2")))
for {
_ <- put("key1", rec)
_ <- put("key2", rec)
r <- batch(List("key1", "key2"), as[Rec])
_ <- put("key", rec)
r <- get("key", as[Rec])
_ <- IO.println(s"res=$r")
} yield assertEquals(r, List(rec, rec))
} yield assertEquals(r, rec.some)
}
}

0 comments on commit 9932314

Please sign in to comment.