diff --git a/.travis.yml b/.travis.yml
index 3be7b21..c308f00 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,25 +1,39 @@
language: scala
sudo: required
dist: trusty
-scala:
- - 2.10.6
- - 2.11.11
- - 2.12.2
-jdk:
- - oraclejdk7
- - oraclejdk8
+group: edge
+
matrix:
- exclude:
- - scala: 2.12.2
- jdk: oraclejdk7
+ include:
+ - jdk: oraclejdk8
+ scala: 2.10.6
+ env: COMMAND=ci PUBLISH=
+ - jdk: oraclejdk8
+ scala: 2.11.11
+ env: COMMAND=ci PUBLISH=
+ - jdk: oraclejdk8
+ scala: 2.12.3
+ env: COMMAND=ci PUBLISH=true
+
+env:
+ global:
+ - secure: GRdfKNrJn/zqjaDWE+16HCfuCSf/wsDpLHocxrOSDiW6QCy73a+MYCujfB989YndQkrmGVkzdmAyKhcfTyYW/Sqjh/sJc2OOc6p+4CeMOGRcLV73wTwi9PjsrzzN0260HnICq3X+3ZUiLdkWoJPLfD6Mflj9iRjJBQIOtV0LzeU=
+ - secure: SPSIblLKFVns7pVY1x3SEs4/16htY5HUzRC51uWXeESE7Nwi3SvBY8LE2BqHygQl/9wKKOdOKoCIBoftukWupIi/r1rT2nVFHremO23Y36hcffN+PFXtW6NIohwIoX34O6G7VGuS2b71IZQHqwr88bY4aHeU4jI3MtU3nXhbEMI=
+ - secure: YVx2BSSsqF7LdYTwinf6o8nqJiYL9FeFAm1HDLxt+ltuMAEbFprOEDA763FANZoUino0uYtOBQ9jWqgMsoo+DvWFrBk4eExC9jGRk7Y/aWw6lx+TCbISGYztkhREQf73JKjbejoxLXf9h9gfo3MpPdrQhzMd2zVKOgSNf8FddZA=
+
+script:
+ - sbt -J-Xmx6144m ++$TRAVIS_SCALA_VERSION $COMMAND
+after_success:
+ - ./project/publish
+
services:
- docker
+
before_install:
- sudo service memcached stop
- docker pull memcached
- docker run -d -p 127.0.0.1:11211:11211 memcached memcached
-script: "sbt clean coverage test"
-after_success: "sbt coverageReport coveralls"
+
cache:
directories:
- $HOME/.sbt/0.13
diff --git a/README.md b/README.md
index edfe6d6..1390849 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,6 @@
# Shade - Memcached Client for Scala
[](https://travis-ci.org/monix/shade)
-[](https://coveralls.io/github/alexandru/shade?branch=master)
[](https://gitter.im/monix/shade?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
## Overview
@@ -23,6 +22,7 @@ Supported for Scala versions: 2.10, 2.11 and 2.12.
## Release Notes
+- [Version 1.10.x](release-notes/1.10.md)
- [Version 1.9.x](release-notes/1.9.md)
- [Version 1.8.x](release-notes/1.8.md)
- [Version 1.7.x](release-notes/1.7.md)
@@ -38,7 +38,7 @@ These are the people maintaining this project that you can annoy:
## Usage From SBT
```scala
-dependencies += "io.monix" %% "shade" % "1.9.5"
+dependencies += "io.monix" %% "shade" % "1.10.0"
```
### Initializing the Memcached Client
diff --git a/build.sbt b/build.sbt
index bfd1edf..2a30555 100644
--- a/build.sbt
+++ b/build.sbt
@@ -1,13 +1,11 @@
name := "shade"
-
-version := "1.9.5"
-
organization := "io.monix"
-scalaVersion := "2.11.11"
-
-crossScalaVersions := Seq("2.10.6", "2.11.11", "2.12.2")
+addCommandAlias("ci", ";clean ;compile ;test ;package")
+addCommandAlias("release", ";+publishSigned ;sonatypeReleaseAll")
+scalaVersion := "2.11.11"
+crossScalaVersions := Seq("2.10.6", "2.11.11", "2.12.3")
compileOrder in ThisBuild := CompileOrder.JavaThenScala
scalacOptions ++= {
@@ -56,9 +54,7 @@ scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match {
scalacOptions in (Compile, doc) ~= (_ filterNot (_ == "-Xfatal-warnings"))
resolvers ++= Seq(
- "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases",
- "Spy" at "http://files.couchbase.com/maven2/",
- Resolver.sonatypeRepo("snapshots")
+ "Spy" at "http://files.couchbase.com/maven2/"
)
libraryDependencies ++= Seq(
@@ -72,47 +68,76 @@ libraryDependencies ++= Seq(
libraryDependencies += ("org.scala-lang" % "scala-reflect" % scalaVersion.value % "compile")
-// -- Settings meant for deployment on oss.sonatype.org
+//------------- For Release
-useGpg := true
-useGpgAgent := true
+useGpg := false
usePgpKeyHex("2673B174C4071B0E")
+pgpPublicRing := baseDirectory.value / "project" / ".gnupg" / "pubring.gpg"
+pgpSecretRing := baseDirectory.value / "project" / ".gnupg" / "secring.gpg"
+pgpPassphrase := sys.env.get("PGP_PASS").map(_.toArray)
+
+enablePlugins(GitVersioning)
+
+/* The BaseVersion setting represents the in-development (upcoming) version,
+ * as an alternative to SNAPSHOTS.
+ */
+git.baseVersion := "1.11.0"
+
+val ReleaseTag = """^v([\d\.]+)$""".r
+git.gitTagToVersionNumber := {
+ case ReleaseTag(v) => Some(v)
+ case _ => None
+}
+
+git.formattedShaVersion := {
+ val suffix = git.makeUncommittedSignifierSuffix(git.gitUncommittedChanges.value, git.uncommittedSignifier.value)
+
+ git.gitHeadCommit.value map { _.substring(0, 7) } map { sha =>
+ git.baseVersion.value + "-" + sha + suffix
+ }
+}
+
+sonatypeProfileName := organization.value
+
+credentials += Credentials(
+ "Sonatype Nexus Repository Manager",
+ "oss.sonatype.org",
+ sys.env.getOrElse("SONATYPE_USER", ""),
+ sys.env.getOrElse("SONATYPE_PASS", "")
+)
publishMavenStyle := true
-publishTo := {
- val nexus = "https://oss.sonatype.org/"
+isSnapshot := version.value endsWith "SNAPSHOT"
+
+publishTo := Some(
if (isSnapshot.value)
- Some("snapshots" at nexus + "content/repositories/snapshots")
+ Opts.resolver.sonatypeSnapshots
else
- Some("releases" at nexus + "service/local/staging/deploy/maven2")
-}
+ Opts.resolver.sonatypeStaging
+)
publishArtifact in Test := false
pomIncludeRepository := { _ => false } // removes optional dependencies
scalariformSettings
-pomExtra in ThisBuild :=
- https://github.com/monix/shade
-
-
- The MIT License
- http://opensource.org/licenses/MIT
- repo
-
-
-
- git@github.com:monix/shade.git
- scm:git:git@github.com:monix/shade.git
-
-
-
- alex_ndc
- Alexandru Nedelcu
- https://alexn.org
-
-
+licenses := Seq("MIT" -> url("https://opensource.org/licenses/MIT"))
+homepage := Some(url("https://github.com/monix/shade"))
+
+scmInfo := Some(
+ ScmInfo(
+ url("https://github.com/monix/shade.git"),
+ "scm:git@github.com:monix/shade.git"
+ ))
+
+developers := List(
+ Developer(
+ id="alexelcu",
+ name="Alexandru Nedelcu",
+ email="noreply@alexn.org",
+ url=url("https://alexn.org")
+ ))
// Multi-project-related
diff --git a/project/.gnupg/pubring.gpg b/project/.gnupg/pubring.gpg
new file mode 100644
index 0000000..a7a537e
Binary files /dev/null and b/project/.gnupg/pubring.gpg differ
diff --git a/project/.gnupg/secring.gpg b/project/.gnupg/secring.gpg
new file mode 100644
index 0000000..3a7a327
Binary files /dev/null and b/project/.gnupg/secring.gpg differ
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 939dc20..a29f8ca 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1,11 +1,7 @@
resolvers += Classpaths.sbtPluginReleases
-addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
-
+addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0")
-
-addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
-
-addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.1.0")
-
-addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.18")
\ No newline at end of file
+addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.18")
+addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.1")
+addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.9.3")
diff --git a/project/publish b/project/publish
new file mode 100755
index 0000000..2f196c8
--- /dev/null
+++ b/project/publish
@@ -0,0 +1,43 @@
+#!/usr/bin/env ruby
+
+def exec(cmd)
+ abort("Error encountered, aborting") unless system(cmd)
+end
+
+puts "CI=#{ENV['CI']}"
+puts "TRAVIS_BRANCH=#{ENV['TRAVIS_BRANCH']}"
+puts "TRAVIS_PULL_REQUEST=#{ENV['TRAVIS_PULL_REQUEST']}"
+puts "PUBLISH=#{ENV['PUBLISH']}"
+puts "SONATYPE_USER=xxxx" if ENV['SONATYPE_USER']
+puts "SONATYPE_PASS=xxxx" if ENV['SONATYPE_PASS']
+puts "PGP_PASS=xxxx" if ENV['PGP_PASS']
+puts
+
+unless ENV['CI'] == 'true'
+ abort("ERROR: Not running on top of Travis, aborting!")
+end
+
+unless ENV['PUBLISH'] == 'true'
+ puts "Publish is disabled"
+ exit
+end
+
+branch = ENV['TRAVIS_BRANCH']
+version = nil
+
+unless branch =~ /^v(\d+\.\d+\.\d+)$/ ||
+ (branch == "snapshot" && ENV['TRAVIS_PULL_REQUEST'] == 'false')
+
+ puts "Only deploying docs on the `publish` branch, or for version tags " +
+ "and not for pull requests or other branches, exiting!"
+ exit 0
+else
+ version = $1
+ puts "Version branch detected: #{version}" if version
+end
+
+# Forcing a change to the root directory, if not there already
+Dir.chdir(File.absolute_path(File.join(File.dirname(__FILE__), "..")))
+
+# Go, go, go
+exec("sbt release")
diff --git a/release-notes/1.10.md b/release-notes/1.10.md
new file mode 100644
index 0000000..7741666
--- /dev/null
+++ b/release-notes/1.10.md
@@ -0,0 +1,7 @@
+# Version 1.10.0 - Aug 16, 2017
+
+- Update SBT to 0.13.15
+- Update Scala versions
+- Update SpyMemcached
+- Add ability to configure the `timeoutThreshold`
+- Configure project for automatic releases from Travis
diff --git a/src/main/scala/shade/memcached/Codec.scala b/src/main/scala/shade/memcached/Codec.scala
index 74994c2..21adf43 100644
--- a/src/main/scala/shade/memcached/Codec.scala
+++ b/src/main/scala/shade/memcached/Codec.scala
@@ -11,140 +11,191 @@
package shade.memcached
-import java.io._
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, Closeable, ObjectOutputStream}
+
+import monix.execution.misc.NonFatal
+import net.spy.memcached.CachedData
+import net.spy.memcached.transcoders._
import scala.annotation.implicitNotFound
-import scala.language.implicitConversions
+import net.spy.memcached.CachedData.MAX_SIZE
+
import scala.reflect.ClassTag
-import scala.util.control.NonFatal
/**
- * Represents a type class that needs to be implemented
- * for serialization/deserialization to work.
- */
-@implicitNotFound("Could not find any Codec implementation for type ${T}. Please provide one or import shade.memcached.MemcachedCodecs._")
-trait Codec[T] {
- def serialize(value: T): Array[Byte]
- def deserialize(data: Array[Byte]): T
+ * Represents a type class that needs to be implemented
+ * for serialization/deserialization to work.
+ */
+@implicitNotFound("Could not find any Codec implementation for type ${T}.")
+trait Codec[T] extends Transcoder[T] {
+ /**
+ * Returns `true` if the decoding needs to happen asynchronously,
+ * or `false` otherwise.
+ *
+ * Decoding should be marked for asynchrony in case it is
+ * expensive, for example when compression is applied.
+ */
+ def asyncDecode(d: CachedData): Boolean
+
+ /**
+ * Encode the given value to a byte array with flags attached,
+ * meant for storage by the Memcached client.
+ */
+ def encode(value: T): CachedData
+
+ /**
+ * Decodes byte arrays with flags, as retrieved by the Memcached client,
+ * into the value it represents.
+ */
+ def decode(data: CachedData): T
+
+ /** Get the maximum size of objects handled by this codec. */
+ def getMaxSize: Int
}
-object Codec extends BaseCodecs
-
-trait BaseCodecs {
- implicit object IntBinaryCodec extends Codec[Int] {
- def serialize(value: Int): Array[Byte] =
- Array(
- (value >>> 24).asInstanceOf[Byte],
- (value >>> 16).asInstanceOf[Byte],
- (value >>> 8).asInstanceOf[Byte],
- value.asInstanceOf[Byte]
- )
-
- def deserialize(data: Array[Byte]): Int =
- (data(0).asInstanceOf[Int] & 255) << 24 |
- (data(1).asInstanceOf[Int] & 255) << 16 |
- (data(2).asInstanceOf[Int] & 255) << 8 |
- data(3).asInstanceOf[Int] & 255
- }
+object Codec extends DefaultCodecs
- implicit object DoubleBinaryCodec extends Codec[Double] {
- import java.lang.{ Double => JvmDouble }
- def serialize(value: Double): Array[Byte] = {
- val l = JvmDouble.doubleToLongBits(value)
- LongBinaryCodec.serialize(l)
- }
+trait DefaultCodecs extends DefaultCodecsLevel0 {
+
+ import java.lang.{Float => JvmFloat, Double => JvmDouble}
+
+ /** Default codec for `Short`. */
+ implicit object ShortBinaryCodec extends GenericIntCodec[Short](
+ flags = 2 << 8, // SerializingTranscoder.SPECIAL_INT
+ toInt = (v: Short) => v.toInt,
+ fromInt = (v: Int) => v.toShort
+ )
+
+ /** Default codec for `Char`. */
+ implicit object CharBinaryCodec extends GenericIntCodec[Char](
+ flags = 2 << 8, // SerializingTranscoder.SPECIAL_INT
+ toInt = (v: Char) => v.toInt,
+ fromInt = (v: Int) => v.toChar
+ )
+
+ /** Default codec for `Int`. */
+ implicit object IntBinaryCodec extends GenericIntCodec[Int](
+ flags = 2 << 8, // SerializingTranscoder.SPECIAL_INT
+ toInt = (v: Int) => v,
+ fromInt = (v: Int) => v
+ )
+
+ /** Default codec for `Long`. */
+ implicit object LongBinaryCodec extends GenericLongCodec[Long](
+ flags = 3 << 8, // SerializingTranscoder.SPECIAL_LONG
+ toLong = (v: Long) => v,
+ fromLong = (v: Long) => v
+ )
- def deserialize(data: Array[Byte]): Double = {
- val l = LongBinaryCodec.deserialize(data)
- JvmDouble.longBitsToDouble(l)
+ /** Default codec for `Float`. */
+ implicit object FloatBinaryCodec extends GenericIntCodec[Float](
+ flags = 6 << 8, // SerializingTranscoder.SPECIAL_FLOAT
+ toInt = JvmFloat.floatToRawIntBits,
+ fromInt = JvmFloat.intBitsToFloat
+ )
+
+ /** Default codec for `Double`. */
+ implicit object DoubleBinaryCodec extends GenericLongCodec[Double](
+ flags = 7 << 8, // SerializingTranscoder.SPECIAL_DOUBLE
+ toLong = JvmDouble.doubleToRawLongBits,
+ fromLong = JvmDouble.longBitsToDouble
+ )
+
+ /** Default codec for `Byte`. */
+ implicit object ByteBinaryCodec extends Codec[Byte] {
+ final val FLAGS = 5 << 8 // SerializingTranscoder.SPECIAL_BYTE
+
+ def asyncDecode(d: CachedData): Boolean = false
+
+ def encode(value: Byte): CachedData = {
+ val bytes = packedUtils.encodeByte(value)
+ new CachedData(FLAGS, bytes, getMaxSize)
}
+
+ def decode(data: CachedData): Byte =
+ data.getData match {
+ case null => 0
+ case bytes =>
+ packedUtils.decodeByte(bytes)
+ }
+
+ def getMaxSize: Int =
+ MAX_SIZE
}
- implicit object FloatBinaryCodec extends Codec[Float] {
- import java.lang.{ Float => JvmFloat }
- def serialize(value: Float): Array[Byte] = {
- val i = JvmFloat.floatToIntBits(value)
- IntBinaryCodec.serialize(i)
- }
+ /** Default codec for `Boolean`. */
+ implicit object BooleanCodec extends Codec[Boolean] {
+ // SerializingTranscoder.SPECIAL_BOOLEAN
+ final val FLAGS = 1 << 8
+
+ def asyncDecode(d: CachedData): Boolean = false
- def deserialize(data: Array[Byte]): Float = {
- val i = IntBinaryCodec.deserialize(data)
- JvmFloat.intBitsToFloat(i)
+ def encode(value: Boolean): CachedData = {
+ val bytes = packedUtils.encodeBoolean(value)
+ new CachedData(FLAGS, bytes, getMaxSize)
}
- }
- implicit object LongBinaryCodec extends Codec[Long] {
- def serialize(value: Long): Array[Byte] =
- Array(
- (value >>> 56).asInstanceOf[Byte],
- (value >>> 48).asInstanceOf[Byte],
- (value >>> 40).asInstanceOf[Byte],
- (value >>> 32).asInstanceOf[Byte],
- (value >>> 24).asInstanceOf[Byte],
- (value >>> 16).asInstanceOf[Byte],
- (value >>> 8).asInstanceOf[Byte],
- value.asInstanceOf[Byte]
- )
-
- def deserialize(data: Array[Byte]): Long =
- (data(0).asInstanceOf[Long] & 255) << 56 |
- (data(1).asInstanceOf[Long] & 255) << 48 |
- (data(2).asInstanceOf[Long] & 255) << 40 |
- (data(3).asInstanceOf[Long] & 255) << 32 |
- (data(4).asInstanceOf[Long] & 255) << 24 |
- (data(5).asInstanceOf[Long] & 255) << 16 |
- (data(6).asInstanceOf[Long] & 255) << 8 |
- data(7).asInstanceOf[Long] & 255
+ def decode(data: CachedData): Boolean =
+ data.getData match {
+ case null => false
+ case bytes =>
+ packedUtils.decodeBoolean(bytes)
+ }
+
+ def getMaxSize: Int =
+ MAX_SIZE
}
- implicit object BooleanBinaryCodec extends Codec[Boolean] {
- def serialize(value: Boolean): Array[Byte] =
- Array((if (value) 1 else 0).asInstanceOf[Byte])
- def deserialize(data: Array[Byte]): Boolean =
- data.isDefinedAt(0) && data(0) == 1
- }
+ implicit def ArrayBinaryCodec: Codec[Array[Byte]] = new Codec[Array[Byte]] {
+ private[this] val tc = new SerializingTranscoder()
- implicit object CharBinaryCodec extends Codec[Char] {
- def serialize(value: Char): Array[Byte] = Array(
- (value >>> 8).asInstanceOf[Byte],
- value.asInstanceOf[Byte]
- )
+ def asyncDecode(d: CachedData): Boolean = tc.asyncDecode(d)
- def deserialize(data: Array[Byte]): Char =
- ((data(0).asInstanceOf[Int] & 255) << 8 |
- data(1).asInstanceOf[Int] & 255)
- .asInstanceOf[Char]
- }
+ def encode(value: Array[Byte]): CachedData = tc.encode(value)
- implicit object ShortBinaryCodec extends Codec[Short] {
- def serialize(value: Short): Array[Byte] = Array(
- (value >>> 8).asInstanceOf[Byte],
- value.asInstanceOf[Byte]
- )
+ def decode(data: CachedData): Array[Byte] = tc.decode(data).asInstanceOf[Array[Byte]]
- def deserialize(data: Array[Byte]): Short =
- ((data(0).asInstanceOf[Short] & 255) << 8 |
- data(1).asInstanceOf[Short] & 255)
- .asInstanceOf[Short]
+ def getMaxSize: Int = tc.getMaxSize
}
- implicit object StringBinaryCodec extends Codec[String] {
- def serialize(value: String): Array[Byte] = value.getBytes("UTF-8")
- def deserialize(data: Array[Byte]): String = new String(data, "UTF-8")
- }
+ implicit def StringBinaryCodec: Codec[String] = new Codec[String] {
+ private[this] val tc = new SerializingTranscoder()
+ def asyncDecode(d: CachedData): Boolean = tc.asyncDecode(d)
+
+ def encode(value: String): CachedData = tc.encode(value)
+
+ def decode(data: CachedData): String = tc.decode(data).asInstanceOf[String]
- implicit object ArrayByteBinaryCodec extends Codec[Array[Byte]] {
- def serialize(value: Array[Byte]): Array[Byte] = value
- def deserialize(data: Array[Byte]): Array[Byte] = data
+ def getMaxSize: Int = tc.getMaxSize
}
+
+
}
-trait GenericCodec {
+private[memcached] trait DefaultCodecsLevel0 {
private[this] class GenericCodec[S <: Serializable](classTag: ClassTag[S]) extends Codec[S] {
- def using[T <: Closeable, R](obj: T)(f: T => R): R =
+ private[this] val tc = new SerializingTranscoder()
+ def asyncDecode(d: CachedData): Boolean =
+ tc.asyncDecode(d)
+
+ def encode(value: S): CachedData = {
+ if (value == null) throw new NullPointerException("Null values not supported!")
+ tc.encode(serialize(value))
+ }
+
+ def decode(data: CachedData): S =
+ tc.decode(data) match {
+ case value: Array[Byte] => deserialize(value)
+ case _ => throw new NullPointerException("Null values not supported!")
+ }
+
+ def getMaxSize: Int =
+ tc.getMaxSize
+
+ private def using[T <: Closeable, R](obj: T)(f: T => R): R =
try
f(obj)
finally
@@ -152,30 +203,75 @@ trait GenericCodec {
case NonFatal(_) => // does nothing
}
- def serialize(value: S): Array[Byte] =
- using (new ByteArrayOutputStream()) { buf =>
- using (new ObjectOutputStream(buf)) { out =>
+ private def serialize(value: S): Array[Byte] =
+ using(new ByteArrayOutputStream()) { buf =>
+ using(new ObjectOutputStream(buf)) { out =>
out.writeObject(value)
out.close()
buf.toByteArray
}
}
- def deserialize(data: Array[Byte]): S =
- using (new ByteArrayInputStream(data)) { buf =>
+ private def deserialize(data: Array[Byte]): S =
+ using(new ByteArrayInputStream(data)) { buf =>
val in = new GenericCodecObjectInputStream(classTag, buf)
- using (in) { inp =>
+ using(in) { inp =>
inp.readObject().asInstanceOf[S]
}
}
}
- implicit def AnyRefBinaryCodec[S <: Serializable](implicit ev: ClassTag[S]): Codec[S] =
+ implicit def serializingCodec[S <: Serializable](implicit ev: ClassTag[S]): Codec[S] =
new GenericCodec[S](ev)
-}
-trait MemcachedCodecs extends BaseCodecs with GenericCodec
+ /** Helper for building codecs that serialize/deserialize to and from `Long`. */
+ class GenericLongCodec[A](flags: Int, toLong: A => Long, fromLong: Long => A) extends Codec[A] {
+ final val FLAGS = flags
+
+ final def asyncDecode(d: CachedData): Boolean =
+ false
+
+ final def encode(value: A): CachedData = {
+ val bytes = packedUtils.encodeLong(toLong(value))
+ new CachedData(FLAGS, bytes, MAX_SIZE)
+ }
+
+ final def decode(data: CachedData): A =
+ fromLong(data.getData match {
+ case null => 0
+ case bytes =>
+ packedUtils.decodeLong(bytes)
+ })
+
+ final def getMaxSize: Int =
+ MAX_SIZE
+ }
+
+ /** Helper for building codecs that serialize/deserialize to and from `Int`. */
+ class GenericIntCodec[A](flags: Int, toInt: A => Int, fromInt: Int => A) extends Codec[A] {
+ final val FLAGS = flags
+
+ final def asyncDecode(d: CachedData): Boolean =
+ false
+
+ final def encode(value: A): CachedData = {
+ val bytes = packedUtils.encodeInt(toInt(value))
+ new CachedData(FLAGS, bytes, MAX_SIZE)
+ }
+
+ final def decode(data: CachedData): A =
+ fromInt(data.getData match {
+ case null => 0
+ case bytes =>
+ packedUtils.decodeInt(bytes)
+ })
+
+ final def getMaxSize: Int =
+ MAX_SIZE
+ }
-object MemcachedCodecs extends MemcachedCodecs
+ protected final val packedUtils =
+ new TranscoderUtils(true)
+}
\ No newline at end of file
diff --git a/src/main/scala/shade/memcached/FakeMemcached.scala b/src/main/scala/shade/memcached/FakeMemcached.scala
index f15411a..25d01f1 100644
--- a/src/main/scala/shade/memcached/FakeMemcached.scala
+++ b/src/main/scala/shade/memcached/FakeMemcached.scala
@@ -12,6 +12,7 @@
package shade.memcached
import monix.execution.CancelableFuture
+import net.spy.memcached.CachedData
import shade.UnhandledStatusException
import shade.inmemory.InMemoryCache
@@ -26,7 +27,7 @@ class FakeMemcached(context: ExecutionContext) extends Memcached {
case null =>
CancelableFuture.successful(false)
case _ =>
- CancelableFuture.successful(cache.add(key, codec.serialize(value).toSeq, exp))
+ CancelableFuture.successful(cache.add[CachedData](key, codec.encode(value), exp))
}
def set[T](key: String, value: T, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Unit] =
@@ -34,52 +35,48 @@ class FakeMemcached(context: ExecutionContext) extends Memcached {
case null =>
CancelableFuture.successful(())
case _ =>
- CancelableFuture.successful(cache.set(key, codec.serialize(value).toSeq, exp))
+ CancelableFuture.successful(cache.set[CachedData](key, codec.encode(value), exp))
}
def delete(key: String): CancelableFuture[Boolean] =
CancelableFuture.successful(cache.delete(key))
def get[T](key: String)(implicit codec: Codec[T]): Future[Option[T]] =
- Future.successful(cache.get[Seq[Byte]](key)).map(_.map(x => codec.deserialize(x.toArray)))
+ Future.successful(cache.get[CachedData](key).map(codec.decode))
def compareAndSet[T](key: String, expecting: Option[T], newValue: T, exp: Duration)(implicit codec: Codec[T]): Future[Boolean] =
- Future.successful(cache.compareAndSet(key, expecting.map(x => codec.serialize(x).toSeq), codec.serialize(newValue).toSeq, exp))
+ Future.successful {
+ val current = cache.get[CachedData](key)
+ if (current.map(codec.decode) == expecting) {
+ cache.set(key, codec.encode(newValue), exp)
+ true
+ } else {
+ false
+ }
+ }
def transformAndGet[T](key: String, exp: Duration)(cb: (Option[T]) => T)(implicit codec: Codec[T]): Future[T] =
- Future.successful(cache.transformAndGet[Seq[Byte]](key: String, exp) { current =>
- val cValue = current.map(x => codec.deserialize(x.toArray))
- val update = cb(cValue)
- codec.serialize(update).toSeq
- }) map { update =>
- codec.deserialize(update.toArray)
- }
+ Future.successful(cache.transformAndGet[CachedData](key, exp)(o => codec.encode(cb(o.map(codec.decode))))).map(codec.decode)
def getAndTransform[T](key: String, exp: Duration)(cb: (Option[T]) => T)(implicit codec: Codec[T]): Future[Option[T]] =
- Future.successful(cache.getAndTransform[Seq[Byte]](key: String, exp) { current =>
- val cValue = current.map(x => codec.deserialize(x.toArray))
- val update = cb(cValue)
- codec.serialize(update).toSeq
- }) map { update =>
- update.map(x => codec.deserialize(x.toArray))
- }
+ Future.successful(cache.getAndTransform[CachedData](key: String, exp)(o => codec.encode(cb(o.map(codec.decode))))).map(c => c.map(codec.decode))
def increment(key: String, by: Long, default: Option[Long], exp: Duration): Future[Long] = {
- def toBigInt(bytes: Seq[Byte]): BigInt = BigInt(new String(bytes.toArray))
- Future.successful(cache.transformAndGet[Seq[Byte]](key, exp) {
- case Some(current) => (toBigInt(current) + by).toString.getBytes
- case None if default.isDefined => default.get.toString.getBytes
+ def toBigInt(bytes: Array[Byte]): BigInt = BigInt(new String(bytes))
+ Future.successful(cache.transformAndGet[CachedData](key, exp) {
+ case Some(current) => new CachedData(0, (toBigInt(current.getData) + by).toString.getBytes, Int.MaxValue)
+ case None if default.isDefined => new CachedData(0, default.get.toString.getBytes, Int.MaxValue)
case None => throw new UnhandledStatusException(s"For key $key - CASNotFoundStatus")
- }).map(toBigInt).map(_.toLong)
+ }).map(c => toBigInt(c.getData)).map(_.toLong)
}
def decrement(key: String, by: Long, default: Option[Long], exp: Duration): Future[Long] = {
- def toBigInt(bytes: Seq[Byte]): BigInt = BigInt(new String(bytes.toArray))
- Future.successful(cache.transformAndGet[Seq[Byte]](key, exp) {
- case Some(current) => (toBigInt(current) - by).max(0).toString.getBytes
- case None if default.isDefined => default.get.toString.getBytes
+ def toBigInt(bytes: Array[Byte]): BigInt = BigInt(new String(bytes))
+ Future.successful(cache.transformAndGet[CachedData](key, exp) {
+ case Some(current) => new CachedData(0, (toBigInt(current.getData) - by).max(0).toString.getBytes, Int.MaxValue)
+ case None if default.isDefined => new CachedData(0, default.get.toString.getBytes, Int.MaxValue)
case None => throw new UnhandledStatusException(s"For key $key - CASNotFoundStatus")
- }).map(toBigInt).map(_.toLong)
+ }).map(c => toBigInt(c.getData)).map(_.toLong)
}
def close(): Unit = {
diff --git a/src/main/scala/shade/memcached/MemcachedImpl.scala b/src/main/scala/shade/memcached/MemcachedImpl.scala
index ab7de5d..efb5034 100644
--- a/src/main/scala/shade/memcached/MemcachedImpl.scala
+++ b/src/main/scala/shade/memcached/MemcachedImpl.scala
@@ -48,7 +48,7 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
case null =>
CancelableFuture.successful(false)
case _ =>
- instance.realAsyncAdd(withPrefix(key), codec.serialize(value), 0, exp, config.operationTimeout) map {
+ instance.realAsyncAdd(withPrefix(key), codec.encode(value), exp, config.operationTimeout) map {
case SuccessfulResult(givenKey, Some(_)) =>
true
case SuccessfulResult(givenKey, None) =>
@@ -68,7 +68,7 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
case null =>
CancelableFuture.successful(())
case _ =>
- instance.realAsyncSet(withPrefix(key), codec.serialize(value), 0, exp, config.operationTimeout) map {
+ instance.realAsyncSet(withPrefix(key), codec.encode(value), exp, config.operationTimeout) map {
case SuccessfulResult(givenKey, _) =>
()
case failure: FailedResult =>
@@ -97,7 +97,7 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
def get[T](key: String)(implicit codec: Codec[T]): Future[Option[T]] =
instance.realAsyncGet(withPrefix(key), config.operationTimeout) map {
case SuccessfulResult(givenKey, option) =>
- option.map(codec.deserialize)
+ option.map(codec.decode)
case failure: FailedResult =>
throwExceptionOn(failure)
}
@@ -126,8 +126,8 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
Future.successful(false)
case SuccessfulResult(givenKey, Some((currentData, casID))) =>
- if (codec.deserialize(currentData) == expectingValue)
- instance.realAsyncCAS(withPrefix(key), casID, 0, codec.serialize(newValue), exp, config.operationTimeout) map {
+ if (codec.decode(currentData) == expectingValue)
+ instance.realAsyncCAS(withPrefix(key), casID, codec.encode(newValue), exp, config.operationTimeout) map {
case SuccessfulResult(_, bool) =>
bool
case failure: FailedResult =>
@@ -172,10 +172,10 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach
loop(retry + 1)
}
case SuccessfulResult(_, Some((current, casID))) =>
- val currentOpt = Some(codec.deserialize(current))
+ val currentOpt = Some(codec.decode(current))
val result = cb(currentOpt)
- instance.realAsyncCAS(keyWithPrefix, casID, 0, codec.serialize(result), exp, remainingTime.millis) flatMap {
+ instance.realAsyncCAS(keyWithPrefix, casID, codec.encode(result), exp, remainingTime.millis) flatMap {
case SuccessfulResult(_, true) =>
Future.successful(f(currentOpt, result))
case SuccessfulResult(_, false) =>
diff --git a/src/main/scala/shade/memcached/internals/SpyMemcachedIntegration.scala b/src/main/scala/shade/memcached/internals/SpyMemcachedIntegration.scala
index 41d49f9..c273ac5 100644
--- a/src/main/scala/shade/memcached/internals/SpyMemcachedIntegration.scala
+++ b/src/main/scala/shade/memcached/internals/SpyMemcachedIntegration.scala
@@ -178,9 +178,9 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres
}
}
- def realAsyncGet(key: String, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Option[Array[Byte]]]] = {
- val promise = Promise[Result[Option[Array[Byte]]]]()
- val result = new MutablePartialResult[Option[Array[Byte]]]
+ def realAsyncGet(key: String, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Option[CachedData]]] = {
+ val promise = Promise[Result[Option[CachedData]]]()
+ val result = new MutablePartialResult[Option[CachedData]]
val op: GetOperation = opFact.get(key, new GetOperation.Callback {
def receivedStatus(opStatus: OperationStatus) {
@@ -193,7 +193,7 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres
def gotData(k: String, flags: Int, data: Array[Byte]) {
assert(key == k, "Wrong key returned")
- result.tryComplete(Success(SuccessfulResult(key, Option(data))))
+ result.tryComplete(Success(SuccessfulResult(key, Option(new CachedData(flags, data, data.length)))))
}
def complete() {
@@ -205,11 +205,11 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres
prepareFuture(key, op, promise, timeout)
}
- def realAsyncSet(key: String, data: Array[Byte], flags: Int, exp: Duration, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Long]] = {
+ def realAsyncSet(key: String, cachedData: CachedData, exp: Duration, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Long]] = {
val promise = Promise[Result[Long]]()
val result = new MutablePartialResult[Long]
- val op: Operation = opFact.store(StoreType.set, key, flags, expiryToSeconds(exp).toInt, data, new StoreOperation.Callback {
+ val op: Operation = opFact.store(StoreType.set, key, cachedData.getFlags, expiryToSeconds(exp).toInt, cachedData.getData, new StoreOperation.Callback {
def receivedStatus(opStatus: OperationStatus) {
handleStatus(opStatus, key, result) {
case CASSuccessStatus =>
@@ -229,11 +229,11 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres
prepareFuture(key, op, promise, timeout)
}
- def realAsyncAdd(key: String, data: Array[Byte], flags: Int, exp: Duration, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Option[Long]]] = {
+ def realAsyncAdd(key: String, cachedData: CachedData, exp: Duration, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Option[Long]]] = {
val promise = Promise[Result[Option[Long]]]()
val result = new MutablePartialResult[Option[Long]]
- val op: Operation = opFact.store(StoreType.add, key, flags, expiryToSeconds(exp).toInt, data, new StoreOperation.Callback {
+ val op: Operation = opFact.store(StoreType.add, key, cachedData.getFlags, expiryToSeconds(exp).toInt, cachedData.getData, new StoreOperation.Callback {
def receivedStatus(opStatus: OperationStatus) {
handleStatus(opStatus, key, result) {
case CASExistsStatus =>
@@ -280,9 +280,9 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres
prepareFuture(key, op, promise, timeout)
}
- def realAsyncGets(key: String, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Option[(Array[Byte], Long)]]] = {
- val promise = Promise[Result[Option[(Array[Byte], Long)]]]()
- val result = new MutablePartialResult[Option[(Array[Byte], Long)]]
+ def realAsyncGets(key: String, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Option[(CachedData, Long)]]] = {
+ val promise = Promise[Result[Option[(CachedData, Long)]]]()
+ val result = new MutablePartialResult[Option[(CachedData, Long)]]
val op: Operation = opFact.gets(key, new GetsOperation.Callback {
def receivedStatus(opStatus: OperationStatus) {
@@ -298,7 +298,7 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres
assert(cas > 0, s"CAS was less than zero: $cas")
result.tryComplete(Try {
- SuccessfulResult(key, Option(data).map(d => (d, cas)))
+ SuccessfulResult(key, Option(data).map(d => (new CachedData(flags, data, data.length), cas)))
})
}
@@ -311,11 +311,11 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres
prepareFuture(key, op, promise, timeout)
}
- def realAsyncCAS(key: String, casID: Long, flags: Int, data: Array[Byte], exp: Duration, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Boolean]] = {
+ def realAsyncCAS(key: String, casID: Long, cachedData: CachedData, exp: Duration, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Boolean]] = {
val promise = Promise[Result[Boolean]]()
val result = new MutablePartialResult[Boolean]
- val op = opFact.cas(StoreType.set, key, casID, flags, expiryToSeconds(exp).toInt, data, new StoreOperation.Callback {
+ val op = opFact.cas(StoreType.set, key, casID, cachedData.getFlags, expiryToSeconds(exp).toInt, cachedData.getData, new StoreOperation.Callback {
def receivedStatus(opStatus: OperationStatus) {
handleStatus(opStatus, key, result) {
case CASSuccessStatus =>
diff --git a/src/test/scala/shade/tests/CodecsSuite.scala b/src/test/scala/shade/memcached/CodecsSuite.scala
similarity index 77%
rename from src/test/scala/shade/tests/CodecsSuite.scala
rename to src/test/scala/shade/memcached/CodecsSuite.scala
index 412aad8..a48d712 100644
--- a/src/test/scala/shade/tests/CodecsSuite.scala
+++ b/src/test/scala/shade/memcached/CodecsSuite.scala
@@ -9,22 +9,21 @@
* https://github.com/monix/shade/blob/master/LICENSE.txt
*/
-package shade.tests
+package shade.memcached
import org.scalacheck.Arbitrary
import org.scalatest.FunSuite
import org.scalatest.prop.GeneratorDrivenPropertyChecks
-import shade.memcached.{ Codec, MemcachedCodecs }
-class CodecsSuite extends FunSuite with MemcachedCodecs with GeneratorDrivenPropertyChecks {
+class CodecsSuite extends FunSuite with DefaultCodecsLevel0 with DefaultCodecs with GeneratorDrivenPropertyChecks {
/**
* Properties-based checking for a codec of type A
*/
private def serdesCheck[A: Arbitrary](codec: Codec[A]): Unit = {
forAll { n: A =>
- val serialised = codec.serialize(n)
- val deserialised = codec.deserialize(serialised)
+ val serialised = codec.encode(n)
+ val deserialised = codec.decode(serialised)
assert(deserialised == n)
}
}
@@ -46,7 +45,7 @@ class CodecsSuite extends FunSuite with MemcachedCodecs with GeneratorDrivenProp
}
test("BooleanBinaryCodec") {
- serdesCheck(BooleanBinaryCodec)
+ serdesCheck(BooleanCodec)
}
test("CharBinaryCodec") {
@@ -62,7 +61,11 @@ class CodecsSuite extends FunSuite with MemcachedCodecs with GeneratorDrivenProp
}
test("ArrayByteBinaryCodec") {
- serdesCheck(ArrayByteBinaryCodec)
+ serdesCheck(ArrayBinaryCodec)
+ }
+
+ test("ByteBinaryCodec") {
+ serdesCheck(ByteBinaryCodec)
}
}
\ No newline at end of file
diff --git a/src/test/scala/shade/tests/MemcachedTestHelpers.scala b/src/test/scala/shade/tests/MemcachedTestHelpers.scala
index 5938561..f8d8b06 100644
--- a/src/test/scala/shade/tests/MemcachedTestHelpers.scala
+++ b/src/test/scala/shade/tests/MemcachedTestHelpers.scala
@@ -16,7 +16,7 @@ import shade.memcached._
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.duration._
-trait MemcachedTestHelpers extends MemcachedCodecs {
+trait MemcachedTestHelpers extends DefaultCodecs {
val defaultConfig = Configuration(
addresses = "127.0.0.1:11211",
authentication = None,