diff --git a/.travis.yml b/.travis.yml index 3be7b21..c308f00 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,25 +1,39 @@ language: scala sudo: required dist: trusty -scala: - - 2.10.6 - - 2.11.11 - - 2.12.2 -jdk: - - oraclejdk7 - - oraclejdk8 +group: edge + matrix: - exclude: - - scala: 2.12.2 - jdk: oraclejdk7 + include: + - jdk: oraclejdk8 + scala: 2.10.6 + env: COMMAND=ci PUBLISH= + - jdk: oraclejdk8 + scala: 2.11.11 + env: COMMAND=ci PUBLISH= + - jdk: oraclejdk8 + scala: 2.12.3 + env: COMMAND=ci PUBLISH=true + +env: + global: + - secure: GRdfKNrJn/zqjaDWE+16HCfuCSf/wsDpLHocxrOSDiW6QCy73a+MYCujfB989YndQkrmGVkzdmAyKhcfTyYW/Sqjh/sJc2OOc6p+4CeMOGRcLV73wTwi9PjsrzzN0260HnICq3X+3ZUiLdkWoJPLfD6Mflj9iRjJBQIOtV0LzeU= + - secure: SPSIblLKFVns7pVY1x3SEs4/16htY5HUzRC51uWXeESE7Nwi3SvBY8LE2BqHygQl/9wKKOdOKoCIBoftukWupIi/r1rT2nVFHremO23Y36hcffN+PFXtW6NIohwIoX34O6G7VGuS2b71IZQHqwr88bY4aHeU4jI3MtU3nXhbEMI= + - secure: YVx2BSSsqF7LdYTwinf6o8nqJiYL9FeFAm1HDLxt+ltuMAEbFprOEDA763FANZoUino0uYtOBQ9jWqgMsoo+DvWFrBk4eExC9jGRk7Y/aWw6lx+TCbISGYztkhREQf73JKjbejoxLXf9h9gfo3MpPdrQhzMd2zVKOgSNf8FddZA= + +script: + - sbt -J-Xmx6144m ++$TRAVIS_SCALA_VERSION $COMMAND +after_success: + - ./project/publish + services: - docker + before_install: - sudo service memcached stop - docker pull memcached - docker run -d -p 127.0.0.1:11211:11211 memcached memcached -script: "sbt clean coverage test" -after_success: "sbt coverageReport coveralls" + cache: directories: - $HOME/.sbt/0.13 diff --git a/README.md b/README.md index edfe6d6..1390849 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # Shade - Memcached Client for Scala [![Build Status](https://travis-ci.org/monix/shade.svg?branch=master)](https://travis-ci.org/monix/shade) -[![Coverage Status](https://coveralls.io/repos/monix/shade/badge.svg?branch=master&service=github)](https://coveralls.io/github/alexandru/shade?branch=master) [![Join the chat at https://gitter.im/monix/shade](https://badges.gitter.im/monix/shade.svg)](https://gitter.im/monix/shade?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) ## Overview @@ -23,6 +22,7 @@ Supported for Scala versions: 2.10, 2.11 and 2.12. ## Release Notes +- [Version 1.10.x](release-notes/1.10.md) - [Version 1.9.x](release-notes/1.9.md) - [Version 1.8.x](release-notes/1.8.md) - [Version 1.7.x](release-notes/1.7.md) @@ -38,7 +38,7 @@ These are the people maintaining this project that you can annoy: ## Usage From SBT ```scala -dependencies += "io.monix" %% "shade" % "1.9.5" +dependencies += "io.monix" %% "shade" % "1.10.0" ``` ### Initializing the Memcached Client diff --git a/build.sbt b/build.sbt index bfd1edf..2a30555 100644 --- a/build.sbt +++ b/build.sbt @@ -1,13 +1,11 @@ name := "shade" - -version := "1.9.5" - organization := "io.monix" -scalaVersion := "2.11.11" - -crossScalaVersions := Seq("2.10.6", "2.11.11", "2.12.2") +addCommandAlias("ci", ";clean ;compile ;test ;package") +addCommandAlias("release", ";+publishSigned ;sonatypeReleaseAll") +scalaVersion := "2.11.11" +crossScalaVersions := Seq("2.10.6", "2.11.11", "2.12.3") compileOrder in ThisBuild := CompileOrder.JavaThenScala scalacOptions ++= { @@ -56,9 +54,7 @@ scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match { scalacOptions in (Compile, doc) ~= (_ filterNot (_ == "-Xfatal-warnings")) resolvers ++= Seq( - "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases", - "Spy" at "http://files.couchbase.com/maven2/", - Resolver.sonatypeRepo("snapshots") + "Spy" at "http://files.couchbase.com/maven2/" ) libraryDependencies ++= Seq( @@ -72,47 +68,76 @@ libraryDependencies ++= Seq( libraryDependencies += ("org.scala-lang" % "scala-reflect" % scalaVersion.value % "compile") -// -- Settings meant for deployment on oss.sonatype.org +//------------- For Release -useGpg := true -useGpgAgent := true +useGpg := false usePgpKeyHex("2673B174C4071B0E") +pgpPublicRing := baseDirectory.value / "project" / ".gnupg" / "pubring.gpg" +pgpSecretRing := baseDirectory.value / "project" / ".gnupg" / "secring.gpg" +pgpPassphrase := sys.env.get("PGP_PASS").map(_.toArray) + +enablePlugins(GitVersioning) + +/* The BaseVersion setting represents the in-development (upcoming) version, + * as an alternative to SNAPSHOTS. + */ +git.baseVersion := "1.11.0" + +val ReleaseTag = """^v([\d\.]+)$""".r +git.gitTagToVersionNumber := { + case ReleaseTag(v) => Some(v) + case _ => None +} + +git.formattedShaVersion := { + val suffix = git.makeUncommittedSignifierSuffix(git.gitUncommittedChanges.value, git.uncommittedSignifier.value) + + git.gitHeadCommit.value map { _.substring(0, 7) } map { sha => + git.baseVersion.value + "-" + sha + suffix + } +} + +sonatypeProfileName := organization.value + +credentials += Credentials( + "Sonatype Nexus Repository Manager", + "oss.sonatype.org", + sys.env.getOrElse("SONATYPE_USER", ""), + sys.env.getOrElse("SONATYPE_PASS", "") +) publishMavenStyle := true -publishTo := { - val nexus = "https://oss.sonatype.org/" +isSnapshot := version.value endsWith "SNAPSHOT" + +publishTo := Some( if (isSnapshot.value) - Some("snapshots" at nexus + "content/repositories/snapshots") + Opts.resolver.sonatypeSnapshots else - Some("releases" at nexus + "service/local/staging/deploy/maven2") -} + Opts.resolver.sonatypeStaging +) publishArtifact in Test := false pomIncludeRepository := { _ => false } // removes optional dependencies scalariformSettings -pomExtra in ThisBuild := - https://github.com/monix/shade - - - The MIT License - http://opensource.org/licenses/MIT - repo - - - - git@github.com:monix/shade.git - scm:git:git@github.com:monix/shade.git - - - - alex_ndc - Alexandru Nedelcu - https://alexn.org - - +licenses := Seq("MIT" -> url("https://opensource.org/licenses/MIT")) +homepage := Some(url("https://github.com/monix/shade")) + +scmInfo := Some( + ScmInfo( + url("https://github.com/monix/shade.git"), + "scm:git@github.com:monix/shade.git" + )) + +developers := List( + Developer( + id="alexelcu", + name="Alexandru Nedelcu", + email="noreply@alexn.org", + url=url("https://alexn.org") + )) // Multi-project-related diff --git a/project/.gnupg/pubring.gpg b/project/.gnupg/pubring.gpg new file mode 100644 index 0000000..a7a537e Binary files /dev/null and b/project/.gnupg/pubring.gpg differ diff --git a/project/.gnupg/secring.gpg b/project/.gnupg/secring.gpg new file mode 100644 index 0000000..3a7a327 Binary files /dev/null and b/project/.gnupg/secring.gpg differ diff --git a/project/plugins.sbt b/project/plugins.sbt index 939dc20..a29f8ca 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,11 +1,7 @@ resolvers += Classpaths.sbtPluginReleases -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") - +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.1") addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0") - -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0") - -addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.1.0") - -addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.18") \ No newline at end of file +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.18") +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.1") +addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.9.3") diff --git a/project/publish b/project/publish new file mode 100755 index 0000000..2f196c8 --- /dev/null +++ b/project/publish @@ -0,0 +1,43 @@ +#!/usr/bin/env ruby + +def exec(cmd) + abort("Error encountered, aborting") unless system(cmd) +end + +puts "CI=#{ENV['CI']}" +puts "TRAVIS_BRANCH=#{ENV['TRAVIS_BRANCH']}" +puts "TRAVIS_PULL_REQUEST=#{ENV['TRAVIS_PULL_REQUEST']}" +puts "PUBLISH=#{ENV['PUBLISH']}" +puts "SONATYPE_USER=xxxx" if ENV['SONATYPE_USER'] +puts "SONATYPE_PASS=xxxx" if ENV['SONATYPE_PASS'] +puts "PGP_PASS=xxxx" if ENV['PGP_PASS'] +puts + +unless ENV['CI'] == 'true' + abort("ERROR: Not running on top of Travis, aborting!") +end + +unless ENV['PUBLISH'] == 'true' + puts "Publish is disabled" + exit +end + +branch = ENV['TRAVIS_BRANCH'] +version = nil + +unless branch =~ /^v(\d+\.\d+\.\d+)$/ || + (branch == "snapshot" && ENV['TRAVIS_PULL_REQUEST'] == 'false') + + puts "Only deploying docs on the `publish` branch, or for version tags " + + "and not for pull requests or other branches, exiting!" + exit 0 +else + version = $1 + puts "Version branch detected: #{version}" if version +end + +# Forcing a change to the root directory, if not there already +Dir.chdir(File.absolute_path(File.join(File.dirname(__FILE__), ".."))) + +# Go, go, go +exec("sbt release") diff --git a/release-notes/1.10.md b/release-notes/1.10.md new file mode 100644 index 0000000..7741666 --- /dev/null +++ b/release-notes/1.10.md @@ -0,0 +1,7 @@ +# Version 1.10.0 - Aug 16, 2017 + +- Update SBT to 0.13.15 +- Update Scala versions +- Update SpyMemcached +- Add ability to configure the `timeoutThreshold` +- Configure project for automatic releases from Travis diff --git a/src/main/scala/shade/memcached/Codec.scala b/src/main/scala/shade/memcached/Codec.scala index 74994c2..21adf43 100644 --- a/src/main/scala/shade/memcached/Codec.scala +++ b/src/main/scala/shade/memcached/Codec.scala @@ -11,140 +11,191 @@ package shade.memcached -import java.io._ +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, Closeable, ObjectOutputStream} + +import monix.execution.misc.NonFatal +import net.spy.memcached.CachedData +import net.spy.memcached.transcoders._ import scala.annotation.implicitNotFound -import scala.language.implicitConversions +import net.spy.memcached.CachedData.MAX_SIZE + import scala.reflect.ClassTag -import scala.util.control.NonFatal /** - * Represents a type class that needs to be implemented - * for serialization/deserialization to work. - */ -@implicitNotFound("Could not find any Codec implementation for type ${T}. Please provide one or import shade.memcached.MemcachedCodecs._") -trait Codec[T] { - def serialize(value: T): Array[Byte] - def deserialize(data: Array[Byte]): T + * Represents a type class that needs to be implemented + * for serialization/deserialization to work. + */ +@implicitNotFound("Could not find any Codec implementation for type ${T}.") +trait Codec[T] extends Transcoder[T] { + /** + * Returns `true` if the decoding needs to happen asynchronously, + * or `false` otherwise. + * + * Decoding should be marked for asynchrony in case it is + * expensive, for example when compression is applied. + */ + def asyncDecode(d: CachedData): Boolean + + /** + * Encode the given value to a byte array with flags attached, + * meant for storage by the Memcached client. + */ + def encode(value: T): CachedData + + /** + * Decodes byte arrays with flags, as retrieved by the Memcached client, + * into the value it represents. + */ + def decode(data: CachedData): T + + /** Get the maximum size of objects handled by this codec. */ + def getMaxSize: Int } -object Codec extends BaseCodecs - -trait BaseCodecs { - implicit object IntBinaryCodec extends Codec[Int] { - def serialize(value: Int): Array[Byte] = - Array( - (value >>> 24).asInstanceOf[Byte], - (value >>> 16).asInstanceOf[Byte], - (value >>> 8).asInstanceOf[Byte], - value.asInstanceOf[Byte] - ) - - def deserialize(data: Array[Byte]): Int = - (data(0).asInstanceOf[Int] & 255) << 24 | - (data(1).asInstanceOf[Int] & 255) << 16 | - (data(2).asInstanceOf[Int] & 255) << 8 | - data(3).asInstanceOf[Int] & 255 - } +object Codec extends DefaultCodecs - implicit object DoubleBinaryCodec extends Codec[Double] { - import java.lang.{ Double => JvmDouble } - def serialize(value: Double): Array[Byte] = { - val l = JvmDouble.doubleToLongBits(value) - LongBinaryCodec.serialize(l) - } +trait DefaultCodecs extends DefaultCodecsLevel0 { + + import java.lang.{Float => JvmFloat, Double => JvmDouble} + + /** Default codec for `Short`. */ + implicit object ShortBinaryCodec extends GenericIntCodec[Short]( + flags = 2 << 8, // SerializingTranscoder.SPECIAL_INT + toInt = (v: Short) => v.toInt, + fromInt = (v: Int) => v.toShort + ) + + /** Default codec for `Char`. */ + implicit object CharBinaryCodec extends GenericIntCodec[Char]( + flags = 2 << 8, // SerializingTranscoder.SPECIAL_INT + toInt = (v: Char) => v.toInt, + fromInt = (v: Int) => v.toChar + ) + + /** Default codec for `Int`. */ + implicit object IntBinaryCodec extends GenericIntCodec[Int]( + flags = 2 << 8, // SerializingTranscoder.SPECIAL_INT + toInt = (v: Int) => v, + fromInt = (v: Int) => v + ) + + /** Default codec for `Long`. */ + implicit object LongBinaryCodec extends GenericLongCodec[Long]( + flags = 3 << 8, // SerializingTranscoder.SPECIAL_LONG + toLong = (v: Long) => v, + fromLong = (v: Long) => v + ) - def deserialize(data: Array[Byte]): Double = { - val l = LongBinaryCodec.deserialize(data) - JvmDouble.longBitsToDouble(l) + /** Default codec for `Float`. */ + implicit object FloatBinaryCodec extends GenericIntCodec[Float]( + flags = 6 << 8, // SerializingTranscoder.SPECIAL_FLOAT + toInt = JvmFloat.floatToRawIntBits, + fromInt = JvmFloat.intBitsToFloat + ) + + /** Default codec for `Double`. */ + implicit object DoubleBinaryCodec extends GenericLongCodec[Double]( + flags = 7 << 8, // SerializingTranscoder.SPECIAL_DOUBLE + toLong = JvmDouble.doubleToRawLongBits, + fromLong = JvmDouble.longBitsToDouble + ) + + /** Default codec for `Byte`. */ + implicit object ByteBinaryCodec extends Codec[Byte] { + final val FLAGS = 5 << 8 // SerializingTranscoder.SPECIAL_BYTE + + def asyncDecode(d: CachedData): Boolean = false + + def encode(value: Byte): CachedData = { + val bytes = packedUtils.encodeByte(value) + new CachedData(FLAGS, bytes, getMaxSize) } + + def decode(data: CachedData): Byte = + data.getData match { + case null => 0 + case bytes => + packedUtils.decodeByte(bytes) + } + + def getMaxSize: Int = + MAX_SIZE } - implicit object FloatBinaryCodec extends Codec[Float] { - import java.lang.{ Float => JvmFloat } - def serialize(value: Float): Array[Byte] = { - val i = JvmFloat.floatToIntBits(value) - IntBinaryCodec.serialize(i) - } + /** Default codec for `Boolean`. */ + implicit object BooleanCodec extends Codec[Boolean] { + // SerializingTranscoder.SPECIAL_BOOLEAN + final val FLAGS = 1 << 8 + + def asyncDecode(d: CachedData): Boolean = false - def deserialize(data: Array[Byte]): Float = { - val i = IntBinaryCodec.deserialize(data) - JvmFloat.intBitsToFloat(i) + def encode(value: Boolean): CachedData = { + val bytes = packedUtils.encodeBoolean(value) + new CachedData(FLAGS, bytes, getMaxSize) } - } - implicit object LongBinaryCodec extends Codec[Long] { - def serialize(value: Long): Array[Byte] = - Array( - (value >>> 56).asInstanceOf[Byte], - (value >>> 48).asInstanceOf[Byte], - (value >>> 40).asInstanceOf[Byte], - (value >>> 32).asInstanceOf[Byte], - (value >>> 24).asInstanceOf[Byte], - (value >>> 16).asInstanceOf[Byte], - (value >>> 8).asInstanceOf[Byte], - value.asInstanceOf[Byte] - ) - - def deserialize(data: Array[Byte]): Long = - (data(0).asInstanceOf[Long] & 255) << 56 | - (data(1).asInstanceOf[Long] & 255) << 48 | - (data(2).asInstanceOf[Long] & 255) << 40 | - (data(3).asInstanceOf[Long] & 255) << 32 | - (data(4).asInstanceOf[Long] & 255) << 24 | - (data(5).asInstanceOf[Long] & 255) << 16 | - (data(6).asInstanceOf[Long] & 255) << 8 | - data(7).asInstanceOf[Long] & 255 + def decode(data: CachedData): Boolean = + data.getData match { + case null => false + case bytes => + packedUtils.decodeBoolean(bytes) + } + + def getMaxSize: Int = + MAX_SIZE } - implicit object BooleanBinaryCodec extends Codec[Boolean] { - def serialize(value: Boolean): Array[Byte] = - Array((if (value) 1 else 0).asInstanceOf[Byte]) - def deserialize(data: Array[Byte]): Boolean = - data.isDefinedAt(0) && data(0) == 1 - } + implicit def ArrayBinaryCodec: Codec[Array[Byte]] = new Codec[Array[Byte]] { + private[this] val tc = new SerializingTranscoder() - implicit object CharBinaryCodec extends Codec[Char] { - def serialize(value: Char): Array[Byte] = Array( - (value >>> 8).asInstanceOf[Byte], - value.asInstanceOf[Byte] - ) + def asyncDecode(d: CachedData): Boolean = tc.asyncDecode(d) - def deserialize(data: Array[Byte]): Char = - ((data(0).asInstanceOf[Int] & 255) << 8 | - data(1).asInstanceOf[Int] & 255) - .asInstanceOf[Char] - } + def encode(value: Array[Byte]): CachedData = tc.encode(value) - implicit object ShortBinaryCodec extends Codec[Short] { - def serialize(value: Short): Array[Byte] = Array( - (value >>> 8).asInstanceOf[Byte], - value.asInstanceOf[Byte] - ) + def decode(data: CachedData): Array[Byte] = tc.decode(data).asInstanceOf[Array[Byte]] - def deserialize(data: Array[Byte]): Short = - ((data(0).asInstanceOf[Short] & 255) << 8 | - data(1).asInstanceOf[Short] & 255) - .asInstanceOf[Short] + def getMaxSize: Int = tc.getMaxSize } - implicit object StringBinaryCodec extends Codec[String] { - def serialize(value: String): Array[Byte] = value.getBytes("UTF-8") - def deserialize(data: Array[Byte]): String = new String(data, "UTF-8") - } + implicit def StringBinaryCodec: Codec[String] = new Codec[String] { + private[this] val tc = new SerializingTranscoder() + def asyncDecode(d: CachedData): Boolean = tc.asyncDecode(d) + + def encode(value: String): CachedData = tc.encode(value) + + def decode(data: CachedData): String = tc.decode(data).asInstanceOf[String] - implicit object ArrayByteBinaryCodec extends Codec[Array[Byte]] { - def serialize(value: Array[Byte]): Array[Byte] = value - def deserialize(data: Array[Byte]): Array[Byte] = data + def getMaxSize: Int = tc.getMaxSize } + + } -trait GenericCodec { +private[memcached] trait DefaultCodecsLevel0 { private[this] class GenericCodec[S <: Serializable](classTag: ClassTag[S]) extends Codec[S] { - def using[T <: Closeable, R](obj: T)(f: T => R): R = + private[this] val tc = new SerializingTranscoder() + def asyncDecode(d: CachedData): Boolean = + tc.asyncDecode(d) + + def encode(value: S): CachedData = { + if (value == null) throw new NullPointerException("Null values not supported!") + tc.encode(serialize(value)) + } + + def decode(data: CachedData): S = + tc.decode(data) match { + case value: Array[Byte] => deserialize(value) + case _ => throw new NullPointerException("Null values not supported!") + } + + def getMaxSize: Int = + tc.getMaxSize + + private def using[T <: Closeable, R](obj: T)(f: T => R): R = try f(obj) finally @@ -152,30 +203,75 @@ trait GenericCodec { case NonFatal(_) => // does nothing } - def serialize(value: S): Array[Byte] = - using (new ByteArrayOutputStream()) { buf => - using (new ObjectOutputStream(buf)) { out => + private def serialize(value: S): Array[Byte] = + using(new ByteArrayOutputStream()) { buf => + using(new ObjectOutputStream(buf)) { out => out.writeObject(value) out.close() buf.toByteArray } } - def deserialize(data: Array[Byte]): S = - using (new ByteArrayInputStream(data)) { buf => + private def deserialize(data: Array[Byte]): S = + using(new ByteArrayInputStream(data)) { buf => val in = new GenericCodecObjectInputStream(classTag, buf) - using (in) { inp => + using(in) { inp => inp.readObject().asInstanceOf[S] } } } - implicit def AnyRefBinaryCodec[S <: Serializable](implicit ev: ClassTag[S]): Codec[S] = + implicit def serializingCodec[S <: Serializable](implicit ev: ClassTag[S]): Codec[S] = new GenericCodec[S](ev) -} -trait MemcachedCodecs extends BaseCodecs with GenericCodec + /** Helper for building codecs that serialize/deserialize to and from `Long`. */ + class GenericLongCodec[A](flags: Int, toLong: A => Long, fromLong: Long => A) extends Codec[A] { + final val FLAGS = flags + + final def asyncDecode(d: CachedData): Boolean = + false + + final def encode(value: A): CachedData = { + val bytes = packedUtils.encodeLong(toLong(value)) + new CachedData(FLAGS, bytes, MAX_SIZE) + } + + final def decode(data: CachedData): A = + fromLong(data.getData match { + case null => 0 + case bytes => + packedUtils.decodeLong(bytes) + }) + + final def getMaxSize: Int = + MAX_SIZE + } + + /** Helper for building codecs that serialize/deserialize to and from `Int`. */ + class GenericIntCodec[A](flags: Int, toInt: A => Int, fromInt: Int => A) extends Codec[A] { + final val FLAGS = flags + + final def asyncDecode(d: CachedData): Boolean = + false + + final def encode(value: A): CachedData = { + val bytes = packedUtils.encodeInt(toInt(value)) + new CachedData(FLAGS, bytes, MAX_SIZE) + } + + final def decode(data: CachedData): A = + fromInt(data.getData match { + case null => 0 + case bytes => + packedUtils.decodeInt(bytes) + }) + + final def getMaxSize: Int = + MAX_SIZE + } -object MemcachedCodecs extends MemcachedCodecs + protected final val packedUtils = + new TranscoderUtils(true) +} \ No newline at end of file diff --git a/src/main/scala/shade/memcached/FakeMemcached.scala b/src/main/scala/shade/memcached/FakeMemcached.scala index f15411a..25d01f1 100644 --- a/src/main/scala/shade/memcached/FakeMemcached.scala +++ b/src/main/scala/shade/memcached/FakeMemcached.scala @@ -12,6 +12,7 @@ package shade.memcached import monix.execution.CancelableFuture +import net.spy.memcached.CachedData import shade.UnhandledStatusException import shade.inmemory.InMemoryCache @@ -26,7 +27,7 @@ class FakeMemcached(context: ExecutionContext) extends Memcached { case null => CancelableFuture.successful(false) case _ => - CancelableFuture.successful(cache.add(key, codec.serialize(value).toSeq, exp)) + CancelableFuture.successful(cache.add[CachedData](key, codec.encode(value), exp)) } def set[T](key: String, value: T, exp: Duration)(implicit codec: Codec[T]): CancelableFuture[Unit] = @@ -34,52 +35,48 @@ class FakeMemcached(context: ExecutionContext) extends Memcached { case null => CancelableFuture.successful(()) case _ => - CancelableFuture.successful(cache.set(key, codec.serialize(value).toSeq, exp)) + CancelableFuture.successful(cache.set[CachedData](key, codec.encode(value), exp)) } def delete(key: String): CancelableFuture[Boolean] = CancelableFuture.successful(cache.delete(key)) def get[T](key: String)(implicit codec: Codec[T]): Future[Option[T]] = - Future.successful(cache.get[Seq[Byte]](key)).map(_.map(x => codec.deserialize(x.toArray))) + Future.successful(cache.get[CachedData](key).map(codec.decode)) def compareAndSet[T](key: String, expecting: Option[T], newValue: T, exp: Duration)(implicit codec: Codec[T]): Future[Boolean] = - Future.successful(cache.compareAndSet(key, expecting.map(x => codec.serialize(x).toSeq), codec.serialize(newValue).toSeq, exp)) + Future.successful { + val current = cache.get[CachedData](key) + if (current.map(codec.decode) == expecting) { + cache.set(key, codec.encode(newValue), exp) + true + } else { + false + } + } def transformAndGet[T](key: String, exp: Duration)(cb: (Option[T]) => T)(implicit codec: Codec[T]): Future[T] = - Future.successful(cache.transformAndGet[Seq[Byte]](key: String, exp) { current => - val cValue = current.map(x => codec.deserialize(x.toArray)) - val update = cb(cValue) - codec.serialize(update).toSeq - }) map { update => - codec.deserialize(update.toArray) - } + Future.successful(cache.transformAndGet[CachedData](key, exp)(o => codec.encode(cb(o.map(codec.decode))))).map(codec.decode) def getAndTransform[T](key: String, exp: Duration)(cb: (Option[T]) => T)(implicit codec: Codec[T]): Future[Option[T]] = - Future.successful(cache.getAndTransform[Seq[Byte]](key: String, exp) { current => - val cValue = current.map(x => codec.deserialize(x.toArray)) - val update = cb(cValue) - codec.serialize(update).toSeq - }) map { update => - update.map(x => codec.deserialize(x.toArray)) - } + Future.successful(cache.getAndTransform[CachedData](key: String, exp)(o => codec.encode(cb(o.map(codec.decode))))).map(c => c.map(codec.decode)) def increment(key: String, by: Long, default: Option[Long], exp: Duration): Future[Long] = { - def toBigInt(bytes: Seq[Byte]): BigInt = BigInt(new String(bytes.toArray)) - Future.successful(cache.transformAndGet[Seq[Byte]](key, exp) { - case Some(current) => (toBigInt(current) + by).toString.getBytes - case None if default.isDefined => default.get.toString.getBytes + def toBigInt(bytes: Array[Byte]): BigInt = BigInt(new String(bytes)) + Future.successful(cache.transformAndGet[CachedData](key, exp) { + case Some(current) => new CachedData(0, (toBigInt(current.getData) + by).toString.getBytes, Int.MaxValue) + case None if default.isDefined => new CachedData(0, default.get.toString.getBytes, Int.MaxValue) case None => throw new UnhandledStatusException(s"For key $key - CASNotFoundStatus") - }).map(toBigInt).map(_.toLong) + }).map(c => toBigInt(c.getData)).map(_.toLong) } def decrement(key: String, by: Long, default: Option[Long], exp: Duration): Future[Long] = { - def toBigInt(bytes: Seq[Byte]): BigInt = BigInt(new String(bytes.toArray)) - Future.successful(cache.transformAndGet[Seq[Byte]](key, exp) { - case Some(current) => (toBigInt(current) - by).max(0).toString.getBytes - case None if default.isDefined => default.get.toString.getBytes + def toBigInt(bytes: Array[Byte]): BigInt = BigInt(new String(bytes)) + Future.successful(cache.transformAndGet[CachedData](key, exp) { + case Some(current) => new CachedData(0, (toBigInt(current.getData) - by).max(0).toString.getBytes, Int.MaxValue) + case None if default.isDefined => new CachedData(0, default.get.toString.getBytes, Int.MaxValue) case None => throw new UnhandledStatusException(s"For key $key - CASNotFoundStatus") - }).map(toBigInt).map(_.toLong) + }).map(c => toBigInt(c.getData)).map(_.toLong) } def close(): Unit = { diff --git a/src/main/scala/shade/memcached/MemcachedImpl.scala b/src/main/scala/shade/memcached/MemcachedImpl.scala index ab7de5d..efb5034 100644 --- a/src/main/scala/shade/memcached/MemcachedImpl.scala +++ b/src/main/scala/shade/memcached/MemcachedImpl.scala @@ -48,7 +48,7 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach case null => CancelableFuture.successful(false) case _ => - instance.realAsyncAdd(withPrefix(key), codec.serialize(value), 0, exp, config.operationTimeout) map { + instance.realAsyncAdd(withPrefix(key), codec.encode(value), exp, config.operationTimeout) map { case SuccessfulResult(givenKey, Some(_)) => true case SuccessfulResult(givenKey, None) => @@ -68,7 +68,7 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach case null => CancelableFuture.successful(()) case _ => - instance.realAsyncSet(withPrefix(key), codec.serialize(value), 0, exp, config.operationTimeout) map { + instance.realAsyncSet(withPrefix(key), codec.encode(value), exp, config.operationTimeout) map { case SuccessfulResult(givenKey, _) => () case failure: FailedResult => @@ -97,7 +97,7 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach def get[T](key: String)(implicit codec: Codec[T]): Future[Option[T]] = instance.realAsyncGet(withPrefix(key), config.operationTimeout) map { case SuccessfulResult(givenKey, option) => - option.map(codec.deserialize) + option.map(codec.decode) case failure: FailedResult => throwExceptionOn(failure) } @@ -126,8 +126,8 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach Future.successful(false) case SuccessfulResult(givenKey, Some((currentData, casID))) => - if (codec.deserialize(currentData) == expectingValue) - instance.realAsyncCAS(withPrefix(key), casID, 0, codec.serialize(newValue), exp, config.operationTimeout) map { + if (codec.decode(currentData) == expectingValue) + instance.realAsyncCAS(withPrefix(key), casID, codec.encode(newValue), exp, config.operationTimeout) map { case SuccessfulResult(_, bool) => bool case failure: FailedResult => @@ -172,10 +172,10 @@ class MemcachedImpl(config: Configuration, ec: ExecutionContext) extends Memcach loop(retry + 1) } case SuccessfulResult(_, Some((current, casID))) => - val currentOpt = Some(codec.deserialize(current)) + val currentOpt = Some(codec.decode(current)) val result = cb(currentOpt) - instance.realAsyncCAS(keyWithPrefix, casID, 0, codec.serialize(result), exp, remainingTime.millis) flatMap { + instance.realAsyncCAS(keyWithPrefix, casID, codec.encode(result), exp, remainingTime.millis) flatMap { case SuccessfulResult(_, true) => Future.successful(f(currentOpt, result)) case SuccessfulResult(_, false) => diff --git a/src/main/scala/shade/memcached/internals/SpyMemcachedIntegration.scala b/src/main/scala/shade/memcached/internals/SpyMemcachedIntegration.scala index 41d49f9..c273ac5 100644 --- a/src/main/scala/shade/memcached/internals/SpyMemcachedIntegration.scala +++ b/src/main/scala/shade/memcached/internals/SpyMemcachedIntegration.scala @@ -178,9 +178,9 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres } } - def realAsyncGet(key: String, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Option[Array[Byte]]]] = { - val promise = Promise[Result[Option[Array[Byte]]]]() - val result = new MutablePartialResult[Option[Array[Byte]]] + def realAsyncGet(key: String, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Option[CachedData]]] = { + val promise = Promise[Result[Option[CachedData]]]() + val result = new MutablePartialResult[Option[CachedData]] val op: GetOperation = opFact.get(key, new GetOperation.Callback { def receivedStatus(opStatus: OperationStatus) { @@ -193,7 +193,7 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres def gotData(k: String, flags: Int, data: Array[Byte]) { assert(key == k, "Wrong key returned") - result.tryComplete(Success(SuccessfulResult(key, Option(data)))) + result.tryComplete(Success(SuccessfulResult(key, Option(new CachedData(flags, data, data.length))))) } def complete() { @@ -205,11 +205,11 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres prepareFuture(key, op, promise, timeout) } - def realAsyncSet(key: String, data: Array[Byte], flags: Int, exp: Duration, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Long]] = { + def realAsyncSet(key: String, cachedData: CachedData, exp: Duration, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Long]] = { val promise = Promise[Result[Long]]() val result = new MutablePartialResult[Long] - val op: Operation = opFact.store(StoreType.set, key, flags, expiryToSeconds(exp).toInt, data, new StoreOperation.Callback { + val op: Operation = opFact.store(StoreType.set, key, cachedData.getFlags, expiryToSeconds(exp).toInt, cachedData.getData, new StoreOperation.Callback { def receivedStatus(opStatus: OperationStatus) { handleStatus(opStatus, key, result) { case CASSuccessStatus => @@ -229,11 +229,11 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres prepareFuture(key, op, promise, timeout) } - def realAsyncAdd(key: String, data: Array[Byte], flags: Int, exp: Duration, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Option[Long]]] = { + def realAsyncAdd(key: String, cachedData: CachedData, exp: Duration, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Option[Long]]] = { val promise = Promise[Result[Option[Long]]]() val result = new MutablePartialResult[Option[Long]] - val op: Operation = opFact.store(StoreType.add, key, flags, expiryToSeconds(exp).toInt, data, new StoreOperation.Callback { + val op: Operation = opFact.store(StoreType.add, key, cachedData.getFlags, expiryToSeconds(exp).toInt, cachedData.getData, new StoreOperation.Callback { def receivedStatus(opStatus: OperationStatus) { handleStatus(opStatus, key, result) { case CASExistsStatus => @@ -280,9 +280,9 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres prepareFuture(key, op, promise, timeout) } - def realAsyncGets(key: String, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Option[(Array[Byte], Long)]]] = { - val promise = Promise[Result[Option[(Array[Byte], Long)]]]() - val result = new MutablePartialResult[Option[(Array[Byte], Long)]] + def realAsyncGets(key: String, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Option[(CachedData, Long)]]] = { + val promise = Promise[Result[Option[(CachedData, Long)]]]() + val result = new MutablePartialResult[Option[(CachedData, Long)]] val op: Operation = opFact.gets(key, new GetsOperation.Callback { def receivedStatus(opStatus: OperationStatus) { @@ -298,7 +298,7 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres assert(cas > 0, s"CAS was less than zero: $cas") result.tryComplete(Try { - SuccessfulResult(key, Option(data).map(d => (d, cas))) + SuccessfulResult(key, Option(data).map(d => (new CachedData(flags, data, data.length), cas))) }) } @@ -311,11 +311,11 @@ class SpyMemcachedIntegration(cf: ConnectionFactory, addrs: Seq[InetSocketAddres prepareFuture(key, op, promise, timeout) } - def realAsyncCAS(key: String, casID: Long, flags: Int, data: Array[Byte], exp: Duration, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Boolean]] = { + def realAsyncCAS(key: String, casID: Long, cachedData: CachedData, exp: Duration, timeout: FiniteDuration)(implicit ec: ExecutionContext): CancelableFuture[Result[Boolean]] = { val promise = Promise[Result[Boolean]]() val result = new MutablePartialResult[Boolean] - val op = opFact.cas(StoreType.set, key, casID, flags, expiryToSeconds(exp).toInt, data, new StoreOperation.Callback { + val op = opFact.cas(StoreType.set, key, casID, cachedData.getFlags, expiryToSeconds(exp).toInt, cachedData.getData, new StoreOperation.Callback { def receivedStatus(opStatus: OperationStatus) { handleStatus(opStatus, key, result) { case CASSuccessStatus => diff --git a/src/test/scala/shade/tests/CodecsSuite.scala b/src/test/scala/shade/memcached/CodecsSuite.scala similarity index 77% rename from src/test/scala/shade/tests/CodecsSuite.scala rename to src/test/scala/shade/memcached/CodecsSuite.scala index 412aad8..a48d712 100644 --- a/src/test/scala/shade/tests/CodecsSuite.scala +++ b/src/test/scala/shade/memcached/CodecsSuite.scala @@ -9,22 +9,21 @@ * https://github.com/monix/shade/blob/master/LICENSE.txt */ -package shade.tests +package shade.memcached import org.scalacheck.Arbitrary import org.scalatest.FunSuite import org.scalatest.prop.GeneratorDrivenPropertyChecks -import shade.memcached.{ Codec, MemcachedCodecs } -class CodecsSuite extends FunSuite with MemcachedCodecs with GeneratorDrivenPropertyChecks { +class CodecsSuite extends FunSuite with DefaultCodecsLevel0 with DefaultCodecs with GeneratorDrivenPropertyChecks { /** * Properties-based checking for a codec of type A */ private def serdesCheck[A: Arbitrary](codec: Codec[A]): Unit = { forAll { n: A => - val serialised = codec.serialize(n) - val deserialised = codec.deserialize(serialised) + val serialised = codec.encode(n) + val deserialised = codec.decode(serialised) assert(deserialised == n) } } @@ -46,7 +45,7 @@ class CodecsSuite extends FunSuite with MemcachedCodecs with GeneratorDrivenProp } test("BooleanBinaryCodec") { - serdesCheck(BooleanBinaryCodec) + serdesCheck(BooleanCodec) } test("CharBinaryCodec") { @@ -62,7 +61,11 @@ class CodecsSuite extends FunSuite with MemcachedCodecs with GeneratorDrivenProp } test("ArrayByteBinaryCodec") { - serdesCheck(ArrayByteBinaryCodec) + serdesCheck(ArrayBinaryCodec) + } + + test("ByteBinaryCodec") { + serdesCheck(ByteBinaryCodec) } } \ No newline at end of file diff --git a/src/test/scala/shade/tests/MemcachedTestHelpers.scala b/src/test/scala/shade/tests/MemcachedTestHelpers.scala index 5938561..f8d8b06 100644 --- a/src/test/scala/shade/tests/MemcachedTestHelpers.scala +++ b/src/test/scala/shade/tests/MemcachedTestHelpers.scala @@ -16,7 +16,7 @@ import shade.memcached._ import scala.concurrent.ExecutionContext.Implicits._ import scala.concurrent.duration._ -trait MemcachedTestHelpers extends MemcachedCodecs { +trait MemcachedTestHelpers extends DefaultCodecs { val defaultConfig = Configuration( addresses = "127.0.0.1:11211", authentication = None,