diff --git a/.gitignore b/.gitignore
index 0122f1ca..1aaf8978 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,4 @@
+databases/*
out/*
generate_bundles.rb
.cache
diff --git a/.travis.yml b/.travis.yml
index 53e614cf..2c1a7a84 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,13 +1,19 @@
language: scala
scala:
- 2.10.4
- - 2.11.0
+ - 2.11.7
jdk:
- oraclejdk7
- - openjdk7
+ - oraclejdk8
services:
- postgresql
- mysql
+cache:
+ directories:
+ - vendor/bundle
+ - $HOME/.m2
+ - $HOME/.ivy2
+ - $HOME/.sbt
before_script:
- ./script/prepare_build.sh
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 48ef302b..6ab0d079 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,9 @@
**Table of Contents**
- [Changelog](#changelog)
+ - [0.2.17 - in progresss](#0217---in-progresss)
+ - [0.2.16 - 2015-01-04](#0216---2015-01-04)
+ - [0.2.15 - 2014-09-12](#0215---2014-09-12)
- [0.2.14 - 2014-08-30](#0214---2014-08-30)
- [0.2.13 - 2014-04-07](#0213---2014-04-07)
- [0.2.12 - 2014-01-11](#0212---2014-01-11)
@@ -22,7 +25,25 @@
# Changelog
-## 0.2.15 - still in progress
+## 0.2.18 - 2015-08-08
+
+* Timeouts implemented queries for MySQL and PostgreSQL - @lifey - #147
+
+## 0.2.17 - 2015-07-13
+
+* Fixed pool leak issue - @haski
+* Fixed date time formatting issue - #142
+
+## 0.2.16 - 2015-01-04
+
+* Add support to byte arrays for PostgreSQL 8 and older - @SattaiLanfear - #21;
+* Make sure connections are returned to the pool before the result is returned to the user - @haski - #119;
+* Support to `SEND_LONG_DATA` to MySQL - @mst-appear - #115;
+* Support for `ByteBuffer` and `ByteBuf` for binary data - @mst-appear - #113 #112;
+* Fixed encoding backslashes in PostgreSQL arrays - @dylex - #110;
+* Included `escape` encoding method for bytes in PostgreSQL - @SattaiLanfear - #107;
+
+## 0.2.15 - 2014-09-12
* Fixes issue where PostgreSQL decoders fail to produce a NULL value if the null is wrapped by a `Some` instance - #99;
* Fixes issue where the 253 case of length encoded fields on MySQL produce a wrong value;
diff --git a/Procfile b/Procfile
index 6c1b0717..1288bcfe 100644
--- a/Procfile
+++ b/Procfile
@@ -1,2 +1,2 @@
-postgresql: postgres -D /Users/mauricio/databases/postgresql
+postgresql: postgres -D databases/postgresql
mysql: mysqld --log-warnings --console
\ No newline at end of file
diff --git a/README.markdown b/README.markdown
index 82f05d6a..42f549d8 100644
--- a/README.markdown
+++ b/README.markdown
@@ -17,6 +17,7 @@
- [Prepared statements](#prepared-statements)
- [Transactions](#transactions)
- [Example usage (for PostgreSQL, but it looks almost the same on MySQL)](#example-usage-for-postgresql-but-it-looks-almost-the-same-on-mysql)
+ - [LISTEN/NOTIFY support (PostgreSQL only)](#listennotify-support-postgresql-only)
- [Contributing](#contributing)
- [Licence](#licence)
@@ -45,13 +46,15 @@ You can view the project's [CHANGELOG here](CHANGELOG.md).
driver allowing you to write less SQL and make use of a nice high level database access API;
* [mod-mysql-postgresql](https://github.com/vert-x/mod-mysql-postgresql) - [vert.x](http://vertx.io/) module that integrates
the driver into a vert.x application;
+* [dbmapper](https://github.com/njeuk/dbmapper) - enables SQL queries with automatic mapping from the database table to the Scala
+ class and a mechanism to create a Table Date Gateway model with very little boiler plate code;
## Include them as dependencies
And if you're in a hurry, you can include them in your build like this, if you're using PostgreSQL:
```scala
-"com.github.mauricio" %% "postgresql-async" % "0.2.14"
+"com.github.mauricio" %% "postgresql-async" % "0.2.18"
```
Or Maven:
@@ -60,14 +63,14 @@ Or Maven:
com.github.mauricio
postgresql-async_2.11
- 0.2.14
+ 0.2.18
```
And if you're into MySQL:
```scala
-"com.github.mauricio" %% "mysql-async" % "0.2.14"
+"com.github.mauricio" %% "mysql-async" % "0.2.18"
```
Or Maven:
@@ -76,7 +79,7 @@ Or Maven:
com.github.mauricio
mysql-async_2.11
- 0.2.14
+ 0.2.18
```
@@ -267,6 +270,21 @@ disconnect and the connection is closed.
You can also use the `ConnectionPool` provided by the driver to simplify working with database connections in your app.
Check the blog post above for more details and the project's ScalaDocs.
+## LISTEN/NOTIFY support (PostgreSQL only)
+
+LISTEN/NOTIFY is a PostgreSQL-specific feature for database-wide publish-subscribe scenarios. You can listen to database
+notifications as such:
+
+```scala
+ val connection: Connection = ...
+
+ connection.sendQuery("LISTEN my_channel")
+ connection.registerNotifyListener {
+ message =>
+ println(s"channel: ${message.channel}, payload: ${message.payload}")
+ }
+```
+
## Contributing
Contributing to the project is simple, fork it on Github, hack on what you're insterested in seeing done or at the
diff --git a/README.md b/README.md
new file mode 100644
index 00000000..4fedd098
--- /dev/null
+++ b/README.md
@@ -0,0 +1,3 @@
+# postgresql-async
+
+[](https://gitter.im/mauricio/postgresql-async?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
\ No newline at end of file
diff --git a/db-async-common/src/main/scala/com/github/mauricio/async/db/Configuration.scala b/db-async-common/src/main/scala/com/github/mauricio/async/db/Configuration.scala
index b9d3041f..841999e1 100644
--- a/db-async-common/src/main/scala/com/github/mauricio/async/db/Configuration.scala
+++ b/db-async-common/src/main/scala/com/github/mauricio/async/db/Configuration.scala
@@ -17,11 +17,10 @@
package com.github.mauricio.async.db
import java.nio.charset.Charset
-import scala.Predef._
-import scala.{None, Option, Int}
+
+import io.netty.buffer.{ByteBufAllocator, PooledByteBufAllocator}
import io.netty.util.CharsetUtil
-import io.netty.buffer.AbstractByteBufAllocator
-import io.netty.buffer.PooledByteBufAllocator
+
import scala.concurrent.duration._
object Configuration {
@@ -44,6 +43,11 @@ object Configuration {
* OOM or eternal loop attacks the client could have, defaults to 16 MB. You can set this
* to any value you would like but again, make sure you know what you are doing if you do
* change it.
+ * @param allocator the netty buffer allocator to be used
+ * @param connectTimeout the timeout for connecting to servers
+ * @param testTimeout the timeout for connection tests performed by pools
+ * @param queryTimeout the optional query timeout
+ *
*/
case class Configuration(username: String,
@@ -53,7 +57,7 @@ case class Configuration(username: String,
database: Option[String] = None,
charset: Charset = Configuration.DefaultCharset,
maximumMessageSize: Int = 16777216,
- allocator: AbstractByteBufAllocator = PooledByteBufAllocator.DEFAULT,
+ allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT,
connectTimeout: Duration = 5.seconds,
- testTimeout: Duration = 5.seconds
- )
+ testTimeout: Duration = 5.seconds,
+ queryTimeout: Option[Duration] = None)
diff --git a/db-async-common/src/main/scala/com/github/mauricio/async/db/column/TimeEncoderDecoder.scala b/db-async-common/src/main/scala/com/github/mauricio/async/db/column/TimeEncoderDecoder.scala
index 9a801775..a7d0c879 100644
--- a/db-async-common/src/main/scala/com/github/mauricio/async/db/column/TimeEncoderDecoder.scala
+++ b/db-async-common/src/main/scala/com/github/mauricio/async/db/column/TimeEncoderDecoder.scala
@@ -33,14 +33,16 @@ class TimeEncoderDecoder extends ColumnEncoderDecoder {
.appendOptional(optional)
.toFormatter
+ final private val printer = new DateTimeFormatterBuilder()
+ .appendPattern("HH:mm:ss.SSSSSS")
+ .toFormatter
+
def formatter = format
- override def decode(value: String): LocalTime = {
+ override def decode(value: String): LocalTime =
format.parseLocalTime(value)
- }
- override def encode(value: Any): String = {
- this.format.print(value.asInstanceOf[LocalTime])
- }
+ override def encode(value: Any): String =
+ this.printer.print(value.asInstanceOf[LocalTime])
}
diff --git a/db-async-common/src/main/scala/com/github/mauricio/async/db/column/UUIDEncoderDecoder.scala b/db-async-common/src/main/scala/com/github/mauricio/async/db/column/UUIDEncoderDecoder.scala
new file mode 100644
index 00000000..11987835
--- /dev/null
+++ b/db-async-common/src/main/scala/com/github/mauricio/async/db/column/UUIDEncoderDecoder.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2013 Maurício Linhares
+ *
+ * Maurício Linhares licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.mauricio.async.db.column
+
+import java.util.UUID
+
+object UUIDEncoderDecoder extends ColumnEncoderDecoder {
+
+ override def decode(value: String): UUID = UUID.fromString(value)
+
+}
diff --git a/db-async-common/src/main/scala/com/github/mauricio/async/db/exceptions/ConnectionTimeoutedException.scala b/db-async-common/src/main/scala/com/github/mauricio/async/db/exceptions/ConnectionTimeoutedException.scala
new file mode 100644
index 00000000..7e02c17c
--- /dev/null
+++ b/db-async-common/src/main/scala/com/github/mauricio/async/db/exceptions/ConnectionTimeoutedException.scala
@@ -0,0 +1,6 @@
+package com.github.mauricio.async.db.exceptions
+
+import com.github.mauricio.async.db.Connection
+
+class ConnectionTimeoutedException( val connection : Connection )
+ extends DatabaseException( "The connection %s has a timeouted query and is being closed".format(connection) )
\ No newline at end of file
diff --git a/db-async-common/src/main/scala/com/github/mauricio/async/db/general/ArrayRowData.scala b/db-async-common/src/main/scala/com/github/mauricio/async/db/general/ArrayRowData.scala
index c232a12a..fe582481 100644
--- a/db-async-common/src/main/scala/com/github/mauricio/async/db/general/ArrayRowData.scala
+++ b/db-async-common/src/main/scala/com/github/mauricio/async/db/general/ArrayRowData.scala
@@ -17,14 +17,10 @@
package com.github.mauricio.async.db.general
import com.github.mauricio.async.db.RowData
-import scala.collection.mutable
-class ArrayRowData( columnCount : Int, row : Int, val mapping : Map[String, Int] )
- extends RowData
+class ArrayRowData(row : Int, val mapping : Map[String, Int], val columns : Array[Any]) extends RowData
{
- private val columns = new Array[Any](columnCount)
-
/**
*
* Returns a column value by it's position in the originating query.
@@ -51,16 +47,5 @@ class ArrayRowData( columnCount : Int, row : Int, val mapping : Map[String, Int]
*/
def rowNumber: Int = row
- /**
- *
- * Sets a value to a column in this collection.
- *
- * @param i
- * @param x
- */
-
- def update(i: Int, x: Any) = columns(i) = x
-
def length: Int = columns.length
-
}
diff --git a/db-async-common/src/main/scala/com/github/mauricio/async/db/general/MutableResultSet.scala b/db-async-common/src/main/scala/com/github/mauricio/async/db/general/MutableResultSet.scala
index 0422a4cf..00cc712b 100644
--- a/db-async-common/src/main/scala/com/github/mauricio/async/db/general/MutableResultSet.scala
+++ b/db-async-common/src/main/scala/com/github/mauricio/async/db/general/MutableResultSet.scala
@@ -31,22 +31,18 @@ class MutableResultSet[T <: ColumnData](
private val columnMapping: Map[String, Int] = this.columnTypes.indices.map(
index =>
( this.columnTypes(index).name, index ) ).toMap
-
+
val columnNames : IndexedSeq[String] = this.columnTypes.map(c => c.name)
+ val types : IndexedSeq[Int] = this.columnTypes.map(c => c.dataType)
+
override def length: Int = this.rows.length
override def apply(idx: Int): RowData = this.rows(idx)
- def addRow( row : Seq[Any] ) {
- val realRow = new ArrayRowData( columnTypes.size, this.rows.size, this.columnMapping )
- var x = 0
- while ( x < row.size ) {
- realRow(x) = row(x)
- x += 1
- }
- this.rows += realRow
+ def addRow(row : Array[Any] ) {
+ this.rows += new ArrayRowData(this.rows.size, this.columnMapping, row)
}
-}
\ No newline at end of file
+}
diff --git a/db-async-common/src/main/scala/com/github/mauricio/async/db/pool/AsyncObjectPool.scala b/db-async-common/src/main/scala/com/github/mauricio/async/db/pool/AsyncObjectPool.scala
index 39179737..3e4345a8 100644
--- a/db-async-common/src/main/scala/com/github/mauricio/async/db/pool/AsyncObjectPool.scala
+++ b/db-async-common/src/main/scala/com/github/mauricio/async/db/pool/AsyncObjectPool.scala
@@ -16,7 +16,7 @@
package com.github.mauricio.async.db.pool
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future, Promise}
/**
*
@@ -70,11 +70,26 @@ trait AsyncObjectPool[T] {
* @return f wrapped with take and giveBack
*/
- def use[A](f : T => Future[A])(implicit executionContext : scala.concurrent.ExecutionContext) : Future[A] =
+ def use[A](f: (T) => Future[A])(implicit executionContext: ExecutionContext): Future[A] =
take.flatMap { item =>
- f(item).andThen { case _ =>
- giveBack(item)
+ val p = Promise[A]()
+ try {
+ f(item).onComplete { r =>
+ giveBack(item).onComplete { _ =>
+ p.complete(r)
+ }
+ }
+ } catch {
+ // calling f might throw exception.
+ // in that case the item will be removed from the pool if identified as invalid by the factory.
+ // the error returned to the user is the original error thrown by f.
+ case error: Throwable =>
+ giveBack(item).onComplete { _ =>
+ p.failure(error)
+ }
}
+
+ p.future
}
}
diff --git a/db-async-common/src/main/scala/com/github/mauricio/async/db/pool/TimeoutScheduler.scala b/db-async-common/src/main/scala/com/github/mauricio/async/db/pool/TimeoutScheduler.scala
new file mode 100644
index 00000000..d97a9ca1
--- /dev/null
+++ b/db-async-common/src/main/scala/com/github/mauricio/async/db/pool/TimeoutScheduler.scala
@@ -0,0 +1,63 @@
+package com.github.mauricio.async.db.pool
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{TimeUnit, TimeoutException, ScheduledFuture}
+import io.netty.channel.EventLoopGroup
+import scala.concurrent.{ExecutionContext, Promise}
+import scala.concurrent.duration.Duration
+
+trait TimeoutScheduler {
+
+ private var isTimeoutedBool = new AtomicBoolean(false)
+
+ /**
+ *
+ * The event loop group to be used for scheduling.
+ *
+ * @return
+ */
+
+ def eventLoopGroup : EventLoopGroup
+
+ /**
+ * Implementors should decide here what they want to do when a timeout occur
+ */
+
+ def onTimeout // implementors should decide here what they want to do when a timeout occur
+
+ /**
+ *
+ * We need this property as isClosed takes time to complete and
+ * we don't want the connection to be used again.
+ *
+ * @return
+ */
+
+ def isTimeouted : Boolean =
+ isTimeoutedBool.get
+
+ def addTimeout[A](
+ promise: Promise[A],
+ durationOption: Option[Duration])
+ (implicit executionContext : ExecutionContext) : Option[ScheduledFuture[_]] = {
+ durationOption.map {
+ duration =>
+ val scheduledFuture = schedule(
+ {
+ if (promise.tryFailure(new TimeoutException(s"Operation is timeouted after it took too long to return (${duration})"))) {
+ isTimeoutedBool.set(true)
+ onTimeout
+ }
+ },
+ duration)
+ promise.future.onComplete(x => scheduledFuture.cancel(false))
+
+ scheduledFuture
+ }
+ }
+
+ def schedule(block: => Unit, duration: Duration) : ScheduledFuture[_] =
+ eventLoopGroup.schedule(new Runnable {
+ override def run(): Unit = block
+ }, duration.toMillis, TimeUnit.MILLISECONDS)
+}
diff --git a/db-async-common/src/test/scala/com/github/mauricio/async/db/pool/DummyTimeoutScheduler.scala b/db-async-common/src/test/scala/com/github/mauricio/async/db/pool/DummyTimeoutScheduler.scala
new file mode 100644
index 00000000..6935259e
--- /dev/null
+++ b/db-async-common/src/test/scala/com/github/mauricio/async/db/pool/DummyTimeoutScheduler.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2013 Maurício Linhares
+ *
+ * Maurício Linhares licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.mauricio.async.db.pool
+
+import java.util.concurrent.atomic.AtomicInteger
+import com.github.mauricio.async.db.util.{NettyUtils, ExecutorServiceUtils}
+import io.netty.channel.EventLoopGroup
+
+/**
+ * Implementation of TimeoutScheduler used for testing
+ */
+class DummyTimeoutScheduler extends TimeoutScheduler {
+ implicit val internalPool = ExecutorServiceUtils.CachedExecutionContext
+ private val timeOuts = new AtomicInteger
+ override def onTimeout = timeOuts.incrementAndGet
+ def timeoutCount = timeOuts.get()
+ def eventLoopGroup : EventLoopGroup = NettyUtils.DefaultEventLoopGroup
+}
diff --git a/db-async-common/src/test/scala/com/github/mauricio/async/db/pool/PartitionedAsyncObjectPoolSpec.scala b/db-async-common/src/test/scala/com/github/mauricio/async/db/pool/PartitionedAsyncObjectPoolSpec.scala
index 3b84755d..51d58fb0 100644
--- a/db-async-common/src/test/scala/com/github/mauricio/async/db/pool/PartitionedAsyncObjectPoolSpec.scala
+++ b/db-async-common/src/test/scala/com/github/mauricio/async/db/pool/PartitionedAsyncObjectPoolSpec.scala
@@ -1,5 +1,7 @@
package com.github.mauricio.async.db.pool
+import java.util.concurrent.atomic.AtomicInteger
+
import org.specs2.mutable.Specification
import scala.util.Try
import scala.concurrent.Await
@@ -17,17 +19,16 @@ class PartitionedAsyncObjectPoolSpec extends SpecificationWithJUnit {
val config =
PoolConfiguration(100, Long.MaxValue, 100, Int.MaxValue)
-
+ private var current = new AtomicInteger
val factory = new ObjectFactory[Int] {
var reject = Set[Int]()
var failCreate = false
- private var current = 0
+
def create =
if (failCreate)
throw new IllegalStateException
else {
- current += 1
- current
+ current.incrementAndGet()
}
def destroy(item: Int) = {}
def validate(item: Int) =
diff --git a/db-async-common/src/test/scala/com/github/mauricio/async/db/pool/TimeoutSchedulerSpec.scala b/db-async-common/src/test/scala/com/github/mauricio/async/db/pool/TimeoutSchedulerSpec.scala
new file mode 100644
index 00000000..acc952e7
--- /dev/null
+++ b/db-async-common/src/test/scala/com/github/mauricio/async/db/pool/TimeoutSchedulerSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2013 Maurício Linhares
+ *
+ * Maurício Linhares licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package com.github.mauricio.async.db.pool
+
+import java.util.concurrent.{ScheduledFuture, TimeoutException}
+import com.github.mauricio.async.db.util.{ByteBufferUtils, ExecutorServiceUtils}
+import org.specs2.mutable.SpecificationWithJUnit
+import scala.concurrent.duration._
+import scala.concurrent.{Future, Promise}
+
+/**
+ * Tests for TimeoutScheduler
+ */
+class TimeoutSchedulerSpec extends SpecificationWithJUnit {
+
+ val TIMEOUT_DID_NOT_PASS = "timeout did not pass"
+
+ "test timeout did not pass" in {
+ val timeoutScheduler = new DummyTimeoutScheduler()
+ val promise = Promise[String]()
+ val scheduledFuture = timeoutScheduler.addTimeout(promise,Some(Duration(1000, MILLISECONDS)))
+ Thread.sleep(100);
+ promise.isCompleted === false
+ promise.success(TIMEOUT_DID_NOT_PASS)
+ Thread.sleep(1500)
+ promise.future.value.get.get === TIMEOUT_DID_NOT_PASS
+ scheduledFuture.get.isCancelled === true
+ timeoutScheduler.timeoutCount === 0
+ }
+
+ "test timeout passed" in {
+ val timeoutMillis = 100
+ val promise = Promise[String]()
+ val timeoutScheduler = new DummyTimeoutScheduler()
+ val scheduledFuture = timeoutScheduler.addTimeout(promise,Some(Duration(timeoutMillis, MILLISECONDS)))
+ Thread.sleep(1000)
+ promise.isCompleted === true
+ scheduledFuture.get.isCancelled === false
+ promise.trySuccess(TIMEOUT_DID_NOT_PASS)
+ timeoutScheduler.timeoutCount === 1
+ promise.future.value.get.get must throwA[TimeoutException](message = s"Operation is timeouted after it took too long to return \\(${timeoutMillis} milliseconds\\)")
+ }
+
+ "test no timeout" in {
+ val timeoutScheduler = new DummyTimeoutScheduler()
+ val promise = Promise[String]()
+ val scheduledFuture = timeoutScheduler.addTimeout(promise,None)
+ Thread.sleep(1000)
+ scheduledFuture === None
+ promise.isCompleted === false
+ promise.success(TIMEOUT_DID_NOT_PASS)
+ promise.future.value.get.get === TIMEOUT_DID_NOT_PASS
+ timeoutScheduler.timeoutCount === 0
+ }
+}
+
diff --git a/mysql-async/README.md b/mysql-async/README.md
index 207d60d1..3a152286 100644
--- a/mysql-async/README.md
+++ b/mysql-async/README.md
@@ -89,6 +89,10 @@ java.sql.Timestamp | timestamp
java.sql.Time | time
String | string
Array[Byte] | blob
+java.nio.ByteBuffer | blob
+io.netty.buffer.ByteBuf | blob
+
+The maximum size of a blob is 2^24-9 bytes (almost 16 MiB).
You don't have to match exact values when sending parameters for your prepared statements, MySQL is usually smart
enough to understand that if you have sent an Int to `smallint` column it has to truncate the 4 bytes into 2.
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLConnection.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLConnection.scala
index a48e8739..cb4a85b0 100644
--- a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLConnection.scala
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLConnection.scala
@@ -16,21 +16,22 @@
package com.github.mauricio.async.db.mysql
+import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
+
import com.github.mauricio.async.db._
import com.github.mauricio.async.db.exceptions._
-import com.github.mauricio.async.db.mysql.codec.{MySQLHandlerDelegate, MySQLConnectionHandler}
+import com.github.mauricio.async.db.mysql.codec.{MySQLConnectionHandler, MySQLHandlerDelegate}
import com.github.mauricio.async.db.mysql.exceptions.MySQLException
import com.github.mauricio.async.db.mysql.message.client._
import com.github.mauricio.async.db.mysql.message.server._
import com.github.mauricio.async.db.mysql.util.CharsetMapper
+import com.github.mauricio.async.db.pool.TimeoutScheduler
import com.github.mauricio.async.db.util.ChannelFutureTransformer.toFuture
import com.github.mauricio.async.db.util._
-import java.util.concurrent.atomic.{AtomicLong,AtomicReference}
-import scala.concurrent.{ExecutionContext, Promise, Future}
-import io.netty.channel.{EventLoopGroup, ChannelHandlerContext}
-import scala.util.Failure
-import scala.Some
-import scala.util.Success
+import io.netty.channel.{ChannelHandlerContext, EventLoopGroup}
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
object MySQLConnection {
final val Counter = new AtomicLong()
@@ -42,10 +43,11 @@ class MySQLConnection(
configuration: Configuration,
charsetMapper: CharsetMapper = CharsetMapper.Instance,
group : EventLoopGroup = NettyUtils.DefaultEventLoopGroup,
- executionContext : ExecutionContext = ExecutorServiceUtils.CachedExecutionContext
+ implicit val executionContext : ExecutionContext = ExecutorServiceUtils.CachedExecutionContext
)
extends MySQLHandlerDelegate
with Connection
+ with TimeoutScheduler
{
import MySQLConnection.log
@@ -53,10 +55,8 @@ class MySQLConnection(
// validate that this charset is supported
charsetMapper.toInt(configuration.charset)
-
private final val connectionCount = MySQLConnection.Counter.incrementAndGet()
private final val connectionId = s"[mysql-connection-$connectionCount]"
- private implicit val internalPool = executionContext
private final val connectionHandler = new MySQLConnectionHandler(
configuration,
@@ -78,6 +78,8 @@ class MySQLConnection(
def lastException : Throwable = this._lastException
def count : Long = this.connectionCount
+ override def eventLoopGroup : EventLoopGroup = group
+
def connect: Future[Connection] = {
this.connectionHandler.connect.onFailure {
case e => this.connectionPromise.tryFailure(e)
@@ -185,18 +187,17 @@ class MySQLConnection(
def sendQuery(query: String): Future[QueryResult] = {
this.validateIsReadyForQuery()
- val promise = Promise[QueryResult]
+ val promise = Promise[QueryResult]()
this.setQueryPromise(promise)
this.connectionHandler.write(new QueryMessage(query))
+ addTimeout(promise, configuration.queryTimeout)
promise.future
}
private def failQueryPromise(t: Throwable) {
-
this.clearQueryPromise.foreach {
_.tryFailure(t)
}
-
}
private def succeedQueryPromise(queryResult: QueryResult) {
@@ -225,6 +226,7 @@ class MySQLConnection(
}
def disconnect: Future[Connection] = this.close
+ override def onTimeout = disconnect
def isConnected: Boolean = this.connectionHandler.isConnected
@@ -234,9 +236,10 @@ class MySQLConnection(
if ( values.length != totalParameters ) {
throw new InsufficientParametersException(totalParameters, values)
}
- val promise = Promise[QueryResult]
+ val promise = Promise[QueryResult]()
this.setQueryPromise(promise)
- this.connectionHandler.write(new PreparedStatementMessage(query, values))
+ this.connectionHandler.sendPreparedStatement(query, values)
+ addTimeout(promise,configuration.queryTimeout)
promise.future
}
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLQueryResult.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLQueryResult.scala
index 7b9cfe57..e7619685 100644
--- a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLQueryResult.scala
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLQueryResult.scala
@@ -19,9 +19,9 @@ package com.github.mauricio.async.db.mysql
import com.github.mauricio.async.db.{ResultSet, QueryResult}
class MySQLQueryResult(
- rowsAffected: Long,
- message: String,
- lastInsertId: Long,
- statusFlags: Int,
- warnings: Int,
- rows: Option[ResultSet] = None) extends QueryResult(rowsAffected, message, rows)
\ No newline at end of file
+ rowsAffected: Long,
+ message: String,
+ val lastInsertId: Long,
+ val statusFlags: Int,
+ val warnings: Int,
+ rows: Option[ResultSet] = None) extends QueryResult(rowsAffected, message, rows)
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowDecoder.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowDecoder.scala
index 0f59ca5e..22c6cee5 100644
--- a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowDecoder.scala
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowDecoder.scala
@@ -31,7 +31,7 @@ class BinaryRowDecoder {
//import BinaryRowDecoder._
- def decode(buffer: ByteBuf, columns: Seq[ColumnDefinitionMessage]): IndexedSeq[Any] = {
+ def decode(buffer: ByteBuf, columns: Seq[ColumnDefinitionMessage]): Array[Any] = {
//log.debug("columns are {} - {}", buffer.readableBytes(), columns)
//log.debug( "decoding row\n{}", MySQLHelper.dumpAsHex(buffer))
@@ -79,7 +79,7 @@ class BinaryRowDecoder {
throw new BufferNotFullyConsumedException(buffer)
}
- row
+ row.toArray
}
}
\ No newline at end of file
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowEncoder.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowEncoder.scala
index c904259e..aff0b36f 100644
--- a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowEncoder.scala
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowEncoder.scala
@@ -16,14 +16,13 @@
package com.github.mauricio.async.db.mysql.binary
-import io.netty.buffer.{Unpooled, ByteBuf}
+import java.nio.ByteBuffer
import java.nio.charset.Charset
+
import com.github.mauricio.async.db.mysql.binary.encoder._
import com.github.mauricio.async.db.util._
+import io.netty.buffer.ByteBuf
import org.joda.time._
-import scala.Some
-import com.github.mauricio.async.db.mysql.column.ColumnTypes
-import java.nio.ByteOrder
object BinaryRowEncoder {
final val log = Log.get[BinaryRowEncoder]
@@ -31,8 +30,6 @@ object BinaryRowEncoder {
class BinaryRowEncoder( charset : Charset ) {
- import BinaryRowEncoder.log
-
private final val stringEncoder = new StringEncoder(charset)
private final val encoders = Map[Class[_],BinaryEncoder](
classOf[String] -> this.stringEncoder,
@@ -65,48 +62,7 @@ class BinaryRowEncoder( charset : Charset ) {
classOf[java.lang.Boolean] -> BooleanEncoder
)
- def encode( values : Seq[Any] ) : ByteBuf = {
-
- val nullBitsCount = (values.size + 7) / 8
- val nullBits = new Array[Byte](nullBitsCount)
- val bitMapBuffer = ByteBufferUtils.mysqlBuffer(1 + nullBitsCount)
- val parameterTypesBuffer = ByteBufferUtils.mysqlBuffer(values.size * 2)
- val parameterValuesBuffer = ByteBufferUtils.mysqlBuffer()
-
-
- var index = 0
-
- while ( index < values.length ) {
- val value = values(index)
- if ( value == null || value == None ) {
- nullBits(index / 8) = (nullBits(index / 8) | (1 << (index & 7))).asInstanceOf[Byte]
- parameterTypesBuffer.writeShort(ColumnTypes.FIELD_TYPE_NULL)
- } else {
- value match {
- case Some(v) => encode(parameterTypesBuffer, parameterValuesBuffer, v)
- case _ => encode(parameterTypesBuffer, parameterValuesBuffer, value)
- }
- }
- index += 1
- }
-
- bitMapBuffer.writeBytes(nullBits)
- if ( values.size > 0 ) {
- bitMapBuffer.writeByte(1)
- } else {
- bitMapBuffer.writeByte(0)
- }
-
- Unpooled.wrappedBuffer( bitMapBuffer, parameterTypesBuffer, parameterValuesBuffer )
- }
-
- private def encode(parameterTypesBuffer: ByteBuf, parameterValuesBuffer: ByteBuf, value: Any): Unit = {
- val encoder = encoderFor(value)
- parameterTypesBuffer.writeShort(encoder.encodesTo)
- encoder.encode(value, parameterValuesBuffer)
- }
-
- private def encoderFor( v : Any ) : BinaryEncoder = {
+ def encoderFor( v : Any ) : BinaryEncoder = {
this.encoders.get(v.getClass) match {
case Some(encoder) => encoder
@@ -128,6 +84,8 @@ class BinaryRowEncoder( charset : Charset ) {
case v : java.sql.Time => SQLTimeEncoder
case v : scala.concurrent.duration.Duration => DurationEncoder
case v : java.util.Date => JavaDateEncoder
+ case v : ByteBuffer => ByteBufferEncoder
+ case v : ByteBuf => ByteBufEncoder
}
}
}
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/encoder/ByteBufEncoder.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/encoder/ByteBufEncoder.scala
new file mode 100644
index 00000000..62b62560
--- /dev/null
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/encoder/ByteBufEncoder.scala
@@ -0,0 +1,17 @@
+package com.github.mauricio.async.db.mysql.binary.encoder
+
+import com.github.mauricio.async.db.mysql.column.ColumnTypes
+import com.github.mauricio.async.db.util.ChannelWrapper.bufferToWrapper
+import io.netty.buffer.ByteBuf
+
+object ByteBufEncoder extends BinaryEncoder {
+ def encode(value: Any, buffer: ByteBuf) {
+ val bytes = value.asInstanceOf[ByteBuf]
+
+ buffer.writeLength(bytes.readableBytes())
+ buffer.writeBytes(bytes)
+ }
+
+ def encodesTo: Int = ColumnTypes.FIELD_TYPE_BLOB
+
+}
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/encoder/ByteBufferEncoder.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/encoder/ByteBufferEncoder.scala
new file mode 100644
index 00000000..329709ad
--- /dev/null
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/encoder/ByteBufferEncoder.scala
@@ -0,0 +1,19 @@
+package com.github.mauricio.async.db.mysql.binary.encoder
+
+import java.nio.ByteBuffer
+
+import com.github.mauricio.async.db.mysql.column.ColumnTypes
+import com.github.mauricio.async.db.util.ChannelWrapper.bufferToWrapper
+import io.netty.buffer.ByteBuf
+
+object ByteBufferEncoder extends BinaryEncoder {
+ def encode(value: Any, buffer: ByteBuf) {
+ val bytes = value.asInstanceOf[ByteBuffer]
+
+ buffer.writeLength(bytes.remaining())
+ buffer.writeBytes(bytes)
+ }
+
+ def encodesTo: Int = ColumnTypes.FIELD_TYPE_BLOB
+
+}
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLConnectionHandler.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLConnectionHandler.scala
index 27ff04da..792aff77 100644
--- a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLConnectionHandler.scala
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLConnectionHandler.scala
@@ -16,7 +16,12 @@
package com.github.mauricio.async.db.mysql.codec
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+
import com.github.mauricio.async.db.Configuration
+import com.github.mauricio.async.db.exceptions.DatabaseException
import com.github.mauricio.async.db.general.MutableResultSet
import com.github.mauricio.async.db.mysql.binary.BinaryRowDecoder
import com.github.mauricio.async.db.mysql.message.client._
@@ -25,16 +30,15 @@ import com.github.mauricio.async.db.mysql.util.CharsetMapper
import com.github.mauricio.async.db.util.ChannelFutureTransformer.toFuture
import com.github.mauricio.async.db.util._
import io.netty.bootstrap.Bootstrap
-import io.netty.buffer.ByteBufAllocator
+import io.netty.buffer.{ByteBuf, ByteBufAllocator, Unpooled}
import io.netty.channel._
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.CodecException
-import java.net.InetSocketAddress
-import scala.Some
+
import scala.annotation.switch
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent._
-import com.github.mauricio.async.db.exceptions.DatabaseException
+import scala.concurrent.duration.Duration
class MySQLConnectionHandler(
configuration: Configuration,
@@ -52,13 +56,14 @@ class MySQLConnectionHandler(
private final val connectionPromise = Promise[MySQLConnectionHandler]
private final val decoder = new MySQLFrameDecoder(configuration.charset, connectionId)
private final val encoder = new MySQLOneToOneEncoder(configuration.charset, charsetMapper)
+ private final val sendLongDataEncoder = new SendLongDataEncoder()
private final val currentParameters = new ArrayBuffer[ColumnDefinitionMessage]()
private final val currentColumns = new ArrayBuffer[ColumnDefinitionMessage]()
private final val parsedStatements = new HashMap[String,PreparedStatementHolder]()
private final val binaryRowDecoder = new BinaryRowDecoder()
private var currentPreparedStatementHolder : PreparedStatementHolder = null
- private var currentPreparedStatement : PreparedStatementMessage = null
+ private var currentPreparedStatement : PreparedStatement = null
private var currentQuery : MutableResultSet[ColumnDefinitionMessage] = null
private var currentContext: ChannelHandlerContext = null
@@ -70,6 +75,7 @@ class MySQLConnectionHandler(
channel.pipeline.addLast(
decoder,
encoder,
+ sendLongDataEncoder,
MySQLConnectionHandler.this)
}
@@ -185,20 +191,21 @@ class MySQLConnectionHandler(
writeAndHandleError(message)
}
- def write( message : PreparedStatementMessage ) {
+ def sendPreparedStatement( query: String, values: Seq[Any] ): Future[ChannelFuture] = {
+ val preparedStatement = new PreparedStatement(query, values)
this.currentColumns.clear()
this.currentParameters.clear()
- this.currentPreparedStatement = message
+ this.currentPreparedStatement = preparedStatement
- this.parsedStatements.get(message.statement) match {
+ this.parsedStatements.get(preparedStatement.statement) match {
case Some( item ) => {
- this.executePreparedStatement(item.statementId, item.columns.size, message.values, item.parameters)
+ this.executePreparedStatement(item.statementId, item.columns.size, preparedStatement.values, item.parameters)
}
case None => {
decoder.preparedStatementPrepareStarted()
- writeAndHandleError( new PreparedStatementPrepareMessage(message.statement) )
+ writeAndHandleError( new PreparedStatementPrepareMessage(preparedStatement.statement) )
}
}
}
@@ -230,11 +237,60 @@ class MySQLConnectionHandler(
}
}
- private def executePreparedStatement( statementId : Array[Byte], columnsCount : Int, values : Seq[Any], parameters : Seq[ColumnDefinitionMessage] ) {
+ private def executePreparedStatement( statementId : Array[Byte], columnsCount : Int, values : Seq[Any], parameters : Seq[ColumnDefinitionMessage] ): Future[ChannelFuture] = {
decoder.preparedStatementExecuteStarted(columnsCount, parameters.size)
this.currentColumns.clear()
this.currentParameters.clear()
- writeAndHandleError(new PreparedStatementExecuteMessage( statementId, values, parameters ))
+
+ val (nonLongIndicesOpt, longValuesOpt) = values.zipWithIndex.map {
+ case (Some(value), index) if isLong(value) => (None, Some(index, value))
+ case (value, index) if isLong(value) => (None, Some(index, value))
+ case (_, index) => (Some(index), None)
+ }.unzip
+ val nonLongIndices: Seq[Int] = nonLongIndicesOpt.flatten
+ val longValues: Seq[(Int, Any)] = longValuesOpt.flatten
+
+ if (longValues.nonEmpty) {
+ val (firstIndex, firstValue) = longValues.head
+ var channelFuture: Future[ChannelFuture] = sendLongParameter(statementId, firstIndex, firstValue)
+ longValues.tail foreach { case (index, value) =>
+ channelFuture = channelFuture.flatMap { _ =>
+ sendLongParameter(statementId, index, value)
+ }
+ }
+ channelFuture flatMap { _ =>
+ writeAndHandleError(new PreparedStatementExecuteMessage(statementId, values, nonLongIndices.toSet, parameters))
+ }
+ } else {
+ writeAndHandleError(new PreparedStatementExecuteMessage(statementId, values, nonLongIndices.toSet, parameters))
+ }
+ }
+
+ private def isLong(value: Any): Boolean = {
+ value match {
+ case v : Array[Byte] => v.length > SendLongDataEncoder.LONG_THRESHOLD
+ case v : ByteBuffer => v.remaining() > SendLongDataEncoder.LONG_THRESHOLD
+ case v : ByteBuf => v.readableBytes() > SendLongDataEncoder.LONG_THRESHOLD
+
+ case _ => false
+ }
+ }
+
+ private def sendLongParameter(statementId: Array[Byte], index: Int, longValue: Any): Future[ChannelFuture] = {
+ longValue match {
+ case v : Array[Byte] =>
+ sendBuffer(Unpooled.wrappedBuffer(v), statementId, index)
+
+ case v : ByteBuffer =>
+ sendBuffer(Unpooled.wrappedBuffer(v), statementId, index)
+
+ case v : ByteBuf =>
+ sendBuffer(v, statementId, index)
+ }
+ }
+
+ private def sendBuffer(buffer: ByteBuf, statementId: Array[Byte], paramId: Int): ChannelFuture = {
+ writeAndHandleError(new SendLongDataMessage(statementId, buffer, paramId))
}
private def onPreparedStatementPrepareResponse( message : PreparedStatementPrepareResponse ) {
@@ -265,17 +321,18 @@ class MySQLConnectionHandler(
}
private def writeAndHandleError( message : Any ) : ChannelFuture = {
-
if ( this.currentContext.channel().isActive ) {
- val future = this.currentContext.writeAndFlush(message)
+ val res = this.currentContext.writeAndFlush(message)
- future.onFailure {
+ res.onFailure {
case e : Throwable => handleException(e)
}
- future
+ res
} else {
- throw new DatabaseException("This channel is not active and can't take messages")
+ val error = new DatabaseException("This channel is not active and can't take messages")
+ handleException(error)
+ this.currentContext.channel().newFailedFuture(error)
}
}
@@ -297,4 +354,10 @@ class MySQLConnectionHandler(
}
}
+ def schedule(block: => Unit, duration: Duration): Unit = {
+ this.currentContext.channel().eventLoop().schedule(new Runnable {
+ override def run(): Unit = block
+ }, duration.toMillis, TimeUnit.MILLISECONDS)
+ }
+
}
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLOneToOneEncoder.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLOneToOneEncoder.scala
index 074a8b6a..f666cbc8 100644
--- a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLOneToOneEncoder.scala
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLOneToOneEncoder.scala
@@ -32,7 +32,8 @@ object MySQLOneToOneEncoder {
val log = Log.get[MySQLOneToOneEncoder]
}
-class MySQLOneToOneEncoder(charset: Charset, charsetMapper: CharsetMapper) extends MessageToMessageEncoder[Any] {
+class MySQLOneToOneEncoder(charset: Charset, charsetMapper: CharsetMapper)
+ extends MessageToMessageEncoder[ClientMessage](classOf[ClientMessage]) {
import MySQLOneToOneEncoder.log
@@ -45,49 +46,43 @@ class MySQLOneToOneEncoder(charset: Charset, charsetMapper: CharsetMapper) exten
private var sequence = 1
- def encode(ctx: ChannelHandlerContext, msg: Any, out: java.util.List[Object]): Unit = {
-
- msg match {
- case message: ClientMessage => {
- val encoder = (message.kind: @switch) match {
- case ClientMessage.ClientProtocolVersion => this.handshakeResponseEncoder
- case ClientMessage.Quit => {
- sequence = 0
- QuitMessageEncoder
- }
- case ClientMessage.Query => {
- sequence = 0
- this.queryEncoder
- }
- case ClientMessage.PreparedStatementExecute => {
- sequence = 0
- this.executeEncoder
- }
- case ClientMessage.PreparedStatementPrepare => {
- sequence = 0
- this.prepareEncoder
- }
- case ClientMessage.AuthSwitchResponse => {
- sequence += 1
- this.authenticationSwitchEncoder
- }
- case _ => throw new EncoderNotAvailableException(message)
- }
-
- val result = encoder.encode(message)
+ def encode(ctx: ChannelHandlerContext, message: ClientMessage, out: java.util.List[Object]): Unit = {
+ val encoder = (message.kind: @switch) match {
+ case ClientMessage.ClientProtocolVersion => this.handshakeResponseEncoder
+ case ClientMessage.Quit => {
+ sequence = 0
+ QuitMessageEncoder
+ }
+ case ClientMessage.Query => {
+ sequence = 0
+ this.queryEncoder
+ }
+ case ClientMessage.PreparedStatementExecute => {
+ sequence = 0
+ this.executeEncoder
+ }
+ case ClientMessage.PreparedStatementPrepare => {
+ sequence = 0
+ this.prepareEncoder
+ }
+ case ClientMessage.AuthSwitchResponse => {
+ sequence += 1
+ this.authenticationSwitchEncoder
+ }
+ case _ => throw new EncoderNotAvailableException(message)
+ }
- ByteBufferUtils.writePacketLength(result, sequence)
+ val result: ByteBuf = encoder.encode(message)
- sequence += 1
+ ByteBufferUtils.writePacketLength(result, sequence)
- if ( log.isTraceEnabled ) {
- log.trace(s"Writing message ${message.getClass.getName} - \n${BufferDumper.dumpAsHex(result)}")
- }
+ sequence += 1
- out.add(result)
- }
+ if ( log.isTraceEnabled ) {
+ log.trace(s"Writing message ${message.getClass.getName} - \n${BufferDumper.dumpAsHex(result)}")
}
+ out.add(result)
}
}
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/PreparedStatementMessage.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/PreparedStatement.scala
similarity index 76%
rename from mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/PreparedStatementMessage.scala
rename to mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/PreparedStatement.scala
index 0e52dad6..08fb0d9f 100644
--- a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/PreparedStatementMessage.scala
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/PreparedStatement.scala
@@ -14,7 +14,6 @@
* under the License.
*/
-package com.github.mauricio.async.db.mysql.message.client
+package com.github.mauricio.async.db.mysql.codec
-case class PreparedStatementMessage ( statement : String, values : Seq[Any])
- extends ClientMessage( ClientMessage.PreparedStatement )
\ No newline at end of file
+case class PreparedStatement ( statement : String, values : Seq[Any])
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/SendLongDataEncoder.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/SendLongDataEncoder.scala
new file mode 100644
index 00000000..ce51140f
--- /dev/null
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/SendLongDataEncoder.scala
@@ -0,0 +1,40 @@
+package com.github.mauricio.async.db.mysql.codec
+
+import com.github.mauricio.async.db.mysql.message.client.{ClientMessage, SendLongDataMessage}
+import com.github.mauricio.async.db.util.{ByteBufferUtils, Log}
+import io.netty.buffer.Unpooled
+import io.netty.channel.ChannelHandlerContext
+import io.netty.handler.codec.MessageToMessageEncoder
+
+object SendLongDataEncoder {
+ val log = Log.get[SendLongDataEncoder]
+
+ val LONG_THRESHOLD = 1023
+}
+
+class SendLongDataEncoder
+ extends MessageToMessageEncoder[SendLongDataMessage](classOf[SendLongDataMessage]) {
+
+ import com.github.mauricio.async.db.mysql.codec.SendLongDataEncoder.log
+
+ def encode(ctx: ChannelHandlerContext, message: SendLongDataMessage, out: java.util.List[Object]): Unit = {
+ if ( log.isTraceEnabled ) {
+ log.trace(s"Writing message ${message.toString}")
+ }
+
+ val sequence = 0
+
+ val headerBuffer = ByteBufferUtils.mysqlBuffer(3 + 1 + 1 + 4 + 2)
+ ByteBufferUtils.write3BytesInt(headerBuffer, 1 + 4 + 2 + message.value.readableBytes())
+ headerBuffer.writeByte(sequence)
+
+ headerBuffer.writeByte(ClientMessage.PreparedStatementSendLongData)
+ headerBuffer.writeBytes(message.statementId)
+ headerBuffer.writeShort(message.paramId)
+
+ val result = Unpooled.wrappedBuffer(headerBuffer, message.value)
+
+ out.add(result)
+ }
+
+}
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/encoder/PreparedStatementExecuteEncoder.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/encoder/PreparedStatementExecuteEncoder.scala
index e21b15f6..c52658c9 100644
--- a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/encoder/PreparedStatementExecuteEncoder.scala
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/encoder/PreparedStatementExecuteEncoder.scala
@@ -17,6 +17,7 @@
package com.github.mauricio.async.db.mysql.encoder
import io.netty.buffer.{ByteBuf, Unpooled}
+import com.github.mauricio.async.db.mysql.column.ColumnTypes
import com.github.mauricio.async.db.mysql.binary.BinaryRowEncoder
import com.github.mauricio.async.db.mysql.message.client.{PreparedStatementExecuteMessage, ClientMessage}
import com.github.mauricio.async.db.util.ByteBufferUtils
@@ -35,10 +36,49 @@ class PreparedStatementExecuteEncoder( rowEncoder : BinaryRowEncoder ) extends M
if ( m.parameters.isEmpty ) {
buffer
} else {
- val parametersBuffer = rowEncoder.encode(m.values)
- Unpooled.wrappedBuffer(buffer, parametersBuffer)
+ Unpooled.wrappedBuffer(buffer, encodeValues(m.values, m.valuesToInclude))
}
}
+ private[encoder] def encodeValues( values : Seq[Any], valuesToInclude: Set[Int] ) : ByteBuf = {
+ val nullBitsCount = (values.size + 7) / 8
+ val nullBits = new Array[Byte](nullBitsCount)
+ val bitMapBuffer = ByteBufferUtils.mysqlBuffer(1 + nullBitsCount)
+ val parameterTypesBuffer = ByteBufferUtils.mysqlBuffer(values.size * 2)
+ val parameterValuesBuffer = ByteBufferUtils.mysqlBuffer()
+
+ var index = 0
+
+ while ( index < values.length ) {
+ val value = values(index)
+ if ( value == null || value == None ) {
+ nullBits(index / 8) = (nullBits(index / 8) | (1 << (index & 7))).asInstanceOf[Byte]
+ parameterTypesBuffer.writeShort(ColumnTypes.FIELD_TYPE_NULL)
+ } else {
+ value match {
+ case Some(v) => encodeValue(parameterTypesBuffer, parameterValuesBuffer, v, valuesToInclude(index))
+ case _ => encodeValue(parameterTypesBuffer, parameterValuesBuffer, value, valuesToInclude(index))
+ }
+ }
+ index += 1
+ }
+
+ bitMapBuffer.writeBytes(nullBits)
+ if ( values.size > 0 ) {
+ bitMapBuffer.writeByte(1)
+ } else {
+ bitMapBuffer.writeByte(0)
+ }
+
+ Unpooled.wrappedBuffer( bitMapBuffer, parameterTypesBuffer, parameterValuesBuffer )
+ }
+
+ private def encodeValue(parameterTypesBuffer: ByteBuf, parameterValuesBuffer: ByteBuf, value: Any, includeValue: Boolean) : Unit = {
+ val encoder = rowEncoder.encoderFor(value)
+ parameterTypesBuffer.writeShort(encoder.encodesTo)
+ if (includeValue)
+ encoder.encode(value, parameterValuesBuffer)
+ }
+
}
\ No newline at end of file
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/ClientMessage.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/ClientMessage.scala
index 72d0be13..2a2a1b1f 100644
--- a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/ClientMessage.scala
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/ClientMessage.scala
@@ -20,13 +20,13 @@ import com.github.mauricio.async.db.KindedMessage
object ClientMessage {
- final val ClientProtocolVersion = 0x09
- final val Quit = 0x01
- final val Query = 0x03
- final val PreparedStatementPrepare = 0x16
- final val PreparedStatementExecute = 0x17
- final val PreparedStatement = 0x18
- final val AuthSwitchResponse = 0xfe
+ final val ClientProtocolVersion = 0x09 // COM_STATISTICS
+ final val Quit = 0x01 // COM_QUIT
+ final val Query = 0x03 // COM_QUERY
+ final val PreparedStatementPrepare = 0x16 // COM_STMT_PREPARE
+ final val PreparedStatementExecute = 0x17 // COM_STMT_EXECUTE
+ final val PreparedStatementSendLongData = 0x18 // COM_STMT_SEND_LONG_DATA
+ final val AuthSwitchResponse = 0xfe // AuthSwitchRequest
}
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/PreparedStatementExecuteMessage.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/PreparedStatementExecuteMessage.scala
index 805ef51e..f87ddede 100644
--- a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/PreparedStatementExecuteMessage.scala
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/PreparedStatementExecuteMessage.scala
@@ -21,5 +21,6 @@ import com.github.mauricio.async.db.mysql.message.server.ColumnDefinitionMessage
case class PreparedStatementExecuteMessage (
statementId : Array[Byte],
values : Seq[Any],
+ valuesToInclude : Set[Int],
parameters : Seq[ColumnDefinitionMessage] )
extends ClientMessage( ClientMessage.PreparedStatementExecute )
\ No newline at end of file
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/SendLongDataMessage.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/SendLongDataMessage.scala
new file mode 100644
index 00000000..db66db1f
--- /dev/null
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/SendLongDataMessage.scala
@@ -0,0 +1,8 @@
+package com.github.mauricio.async.db.mysql.message.client
+
+import io.netty.buffer.ByteBuf
+
+case class SendLongDataMessage (
+ statementId : Array[Byte],
+ value : ByteBuf,
+ paramId : Int )
\ No newline at end of file
diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/pool/MySQLConnectionFactory.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/pool/MySQLConnectionFactory.scala
index 83791366..273e76af 100644
--- a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/pool/MySQLConnectionFactory.scala
+++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/pool/MySQLConnectionFactory.scala
@@ -21,9 +21,8 @@ import com.github.mauricio.async.db.pool.ObjectFactory
import com.github.mauricio.async.db.mysql.MySQLConnection
import scala.util.Try
import scala.concurrent.Await
-import scala.concurrent.duration._
import com.github.mauricio.async.db.util.Log
-import com.github.mauricio.async.db.exceptions.{ConnectionStillRunningQueryException, ConnectionNotConnectedException}
+import com.github.mauricio.async.db.exceptions.{ConnectionTimeoutedException, ConnectionStillRunningQueryException, ConnectionNotConnectedException}
object MySQLConnectionFactory {
final val log = Log.get[MySQLConnectionFactory]
@@ -90,7 +89,9 @@ class MySQLConnectionFactory( configuration : Configuration ) extends ObjectFact
*/
def validate(item: MySQLConnection): Try[MySQLConnection] = {
Try{
-
+ if ( item.isTimeouted ) {
+ throw new ConnectionTimeoutedException(item)
+ }
if ( !item.isConnected ) {
throw new ConnectionNotConnectedException(item)
}
diff --git a/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/BinaryColumnsSpec.scala b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/BinaryColumnsSpec.scala
index 22912620..6c7c1313 100644
--- a/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/BinaryColumnsSpec.scala
+++ b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/BinaryColumnsSpec.scala
@@ -2,6 +2,8 @@ package com.github.mauricio.async.db.mysql
import org.specs2.mutable.Specification
import java.util.UUID
+import java.nio.ByteBuffer
+import io.netty.buffer.Unpooled
import io.netty.util.CharsetUtil
import com.github.mauricio.async.db.RowData
@@ -96,6 +98,52 @@ class BinaryColumnsSpec extends Specification with ConnectionHelper {
}
+ "support BLOB type" in {
+
+ val bytes = (1 to 10).map(_.toByte).toArray
+
+ testBlob(bytes)
+
+ }
+
+ "support BLOB type with large values" in {
+
+ val bytes = (1 to 2100).map(_.toByte).toArray
+
+ testBlob(bytes)
+
+ }
+
+ }
+
+ def testBlob(bytes: Array[Byte]) = {
+ val create =
+ """CREATE TEMPORARY TABLE POSTS (
+ | id INT NOT NULL,
+ | blob_column BLOB,
+ | primary key (id))
+ """.stripMargin
+
+ val insert = "INSERT INTO POSTS (id,blob_column) VALUES (?,?)"
+ val select = "SELECT id,blob_column FROM POSTS ORDER BY id"
+
+ withConnection {
+ connection =>
+ executeQuery(connection, create)
+ executePreparedStatement(connection, insert, 1, Some(bytes))
+ executePreparedStatement(connection, insert, 2, ByteBuffer.wrap(bytes))
+ executePreparedStatement(connection, insert, 3, Unpooled.wrappedBuffer(bytes))
+
+ val Some(rows) = executeQuery(connection, select).rows
+ rows(0)("id") === 1
+ rows(0)("blob_column") === bytes
+ rows(1)("id") === 2
+ rows(1)("blob_column") === bytes
+ rows(2)("id") === 3
+ rows(2)("blob_column") === bytes
+ rows.size === 3
+ }
+
}
def compareBytes( row : RowData, column : String, expected : String ) =
diff --git a/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/ConnectionHelper.scala b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/ConnectionHelper.scala
index 771fe1e3..8ace95e7 100644
--- a/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/ConnectionHelper.scala
+++ b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/ConnectionHelper.scala
@@ -115,6 +115,19 @@ trait ConnectionHelper {
}
+ def withConfigurablePool[T]( configuration : Configuration )( fn : (ConnectionPool[MySQLConnection]) => T ) : T = {
+
+ val factory = new MySQLConnectionFactory(configuration)
+ val pool = new ConnectionPool[MySQLConnection](factory, PoolConfiguration.Default)
+
+ try {
+ fn(pool)
+ } finally {
+ awaitFuture( pool.close )
+ }
+
+ }
+
def withConnection[T]( fn : (MySQLConnection) => T ) : T =
withConfigurableConnection(this.defaultConfiguration)(fn)
diff --git a/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/MySQLConnectionSpec.scala b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/MySQLConnectionSpec.scala
index aebf18dd..5e5500fa 100644
--- a/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/MySQLConnectionSpec.scala
+++ b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/MySQLConnectionSpec.scala
@@ -30,8 +30,8 @@ class MySQLConnectionSpec extends Specification {
database = Some("mysql_async_tests")
)
- val rootConfiguration = new Configuration(
- "root",
+ val configurationWithoutPassword = new Configuration(
+ "mysql_async_nopw",
"localhost",
port = 3306,
password = None,
@@ -39,7 +39,7 @@ class MySQLConnectionSpec extends Specification {
)
val configurationWithoutDatabase = new Configuration(
- "root",
+ "mysql_async_nopw",
"localhost",
port = 3306,
password = None,
@@ -69,7 +69,7 @@ class MySQLConnectionSpec extends Specification {
withNonConnectedConnection({
connection =>
awaitFuture(connection.connect) === connection
- }) (rootConfiguration)
+ }) (configurationWithoutPassword)
}
"connect to a MySQL instance without a database" in {
diff --git a/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/QueryTimeoutSpec.scala b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/QueryTimeoutSpec.scala
new file mode 100644
index 00000000..65827432
--- /dev/null
+++ b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/QueryTimeoutSpec.scala
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2013 Maurício Linhares
+ *
+ * Maurício Linhares licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.mauricio.async.db.mysql
+
+import java.util.concurrent.TimeoutException
+import com.github.mauricio.async.db.Configuration
+import org.specs2.execute.{AsResult, Success, ResultExecution}
+import org.specs2.mutable.Specification
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class QueryTimeoutSpec extends Specification with ConnectionHelper {
+ implicit def unitAsResult: AsResult[Unit] = new AsResult[Unit] {
+ def asResult(r: =>Unit) =
+ ResultExecution.execute(r)(_ => Success())
+ }
+ "Simple query with 1 nanosec timeout" in {
+ withConfigurablePool(shortTimeoutConfiguration) {
+ pool => {
+ val connection = Await.result(pool.take, Duration(10,SECONDS))
+ connection.isTimeouted === false
+ connection.isConnected === true
+ val queryResultFuture = connection.sendQuery("select sleep(1)")
+ Await.result(queryResultFuture, Duration(10,SECONDS)) must throwA[TimeoutException]()
+ connection.isTimeouted === true
+ Await.ready(pool.giveBack(connection), Duration(10,SECONDS))
+ pool.availables.count(_ == connection) === 0 // connection removed from pool
+ // we do not know when the connection will be closed.
+ }
+ }
+ }
+
+ "Simple query with 5 sec timeout" in {
+ withConfigurablePool(longTimeoutConfiguration) {
+ pool => {
+ val connection = Await.result(pool.take, Duration(10,SECONDS))
+ connection.isTimeouted === false
+ connection.isConnected === true
+ val queryResultFuture = connection.sendQuery("select sleep(1)")
+ Await.result(queryResultFuture, Duration(10,SECONDS)).rows.get.size === 1
+ connection.isTimeouted === false
+ connection.isConnected === true
+ Await.ready(pool.giveBack(connection), Duration(10,SECONDS))
+ pool.availables.count(_ == connection) === 1 // connection returned to pool
+ }
+ }
+ }
+
+ def shortTimeoutConfiguration = new Configuration(
+ "mysql_async",
+ "localhost",
+ port = 3306,
+ password = Some("root"),
+ database = Some("mysql_async_tests"),
+ queryTimeout = Some(Duration(1,NANOSECONDS))
+ )
+
+ def longTimeoutConfiguration = new Configuration(
+ "mysql_async",
+ "localhost",
+ port = 3306,
+ password = Some("root"),
+ database = Some("mysql_async_tests"),
+ queryTimeout = Some(Duration(5,SECONDS))
+ )
+}
diff --git a/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/StoredProceduresSpec.scala b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/StoredProceduresSpec.scala
new file mode 100644
index 00000000..3d68563b
--- /dev/null
+++ b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/StoredProceduresSpec.scala
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2013 Maurício Linhares
+ *
+ * Maurício Linhares licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.mauricio.async.db.mysql
+
+import com.github.mauricio.async.db.ResultSet
+import com.github.mauricio.async.db.util.FutureUtils._
+import org.specs2.mutable.Specification
+
+class StoredProceduresSpec extends Specification with ConnectionHelper {
+
+ "connection" should {
+
+ "be able to execute create stored procedure" in {
+ withConnection {
+ connection =>
+ val future = for(
+ drop <- connection.sendQuery("DROP PROCEDURE IF exists helloWorld;");
+ create <- connection.sendQuery(
+ """
+ CREATE PROCEDURE helloWorld(OUT param1 VARCHAR(20))
+ BEGIN
+ SELECT 'hello' INTO param1;
+ END
+ """
+ )
+ ) yield create
+ awaitFuture(future).statusMessage === ""
+ }
+ }
+
+ "be able to call stored procedure" in {
+ withConnection {
+ connection =>
+ val future = for(
+ drop <- connection.sendQuery("DROP PROCEDURE IF exists constTest;");
+ create <- connection.sendQuery(
+ """
+ CREATE PROCEDURE constTest(OUT param INT)
+ BEGIN
+ SELECT 125 INTO param;
+ END
+ """
+ );
+ call <- connection.sendQuery("CALL constTest(@arg)");
+ arg <- connection.sendQuery("SELECT @arg")
+ ) yield arg
+ val result: Option[ResultSet] = awaitFuture(future).rows
+ result.isDefined === true
+ val rows = result.get
+ rows.size === 1
+ rows(0)(rows.columnNames.head) === 125
+ }
+ }
+
+ "be able to call stored procedure with input parameter" in {
+ withConnection {
+ connection =>
+ val future = for(
+ drop <- connection.sendQuery("DROP PROCEDURE IF exists addTest;");
+ create <- connection.sendQuery(
+ """
+ CREATE PROCEDURE addTest(IN a INT, IN b INT, OUT sum INT)
+ BEGIN
+ SELECT a+b INTO sum;
+ END
+ """
+ );
+ call <- connection.sendQuery("CALL addTest(132, 245, @sm)");
+ res <- connection.sendQuery("SELECT @sm")
+ ) yield res
+ val result: Option[ResultSet] = awaitFuture(future).rows
+ result.isDefined === true
+ val rows = result.get
+ rows.size === 1
+ rows(0)(rows.columnNames.head) === 377
+ }
+ }
+
+ "be able to remove stored procedure" in {
+ withConnection {
+ connection =>
+ val createResult: Option[ResultSet] = awaitFuture(
+ for(
+ drop <- connection.sendQuery("DROP PROCEDURE IF exists remTest;");
+ create <- connection.sendQuery(
+ """
+ CREATE PROCEDURE remTest(OUT cnst INT)
+ BEGIN
+ SELECT 987 INTO cnst;
+ END
+ """
+ );
+ routine <- connection.sendQuery(
+ """
+ SELECT routine_name FROM INFORMATION_SCHEMA.ROUTINES WHERE routine_name="remTest"
+ """
+ )
+ ) yield routine
+ ).rows
+ createResult.isDefined === true
+ createResult.get.size === 1
+ createResult.get(0)("routine_name") === "remTest"
+ val removeResult: Option[ResultSet] = awaitFuture(
+ for(
+ drop <- connection.sendQuery("DROP PROCEDURE remTest;");
+ routine <- connection.sendQuery(
+ """
+ SELECT routine_name FROM INFORMATION_SCHEMA.ROUTINES WHERE routine_name="remTest"
+ """
+ )
+ ) yield routine
+ ).rows
+ removeResult.isDefined === true
+ removeResult.get.isEmpty === true
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/TransactionSpec.scala b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/TransactionSpec.scala
index 0312f0d5..0ef2f86b 100644
--- a/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/TransactionSpec.scala
+++ b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/TransactionSpec.scala
@@ -1,14 +1,28 @@
package com.github.mauricio.async.db.mysql
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
import org.specs2.mutable.Specification
import com.github.mauricio.async.db.util.FutureUtils.awaitFuture
import com.github.mauricio.async.db.mysql.exceptions.MySQLException
import com.github.mauricio.async.db.Connection
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.util.{Success, Failure}
+
+object TransactionSpec {
+
+ val BrokenInsert = """INSERT INTO users (id, name) VALUES (1, 'Maurício Aragão')"""
+ val InsertUser = """INSERT INTO users (name) VALUES (?)"""
+ val TransactionInsert = "insert into transaction_test (id) values (?)"
+
+}
+
class TransactionSpec extends Specification with ConnectionHelper {
- val brokenInsert = """INSERT INTO users (id, name) VALUES (1, 'Maurício Aragão')"""
- val insertUser = """INSERT INTO users (name) VALUES (?)"""
+ import TransactionSpec._
"connection in transaction" should {
@@ -42,7 +56,7 @@ class TransactionSpec extends Specification with ConnectionHelper {
val future = connection.inTransaction {
c =>
- c.sendQuery(this.insert).flatMap(r => c.sendQuery(brokenInsert))
+ c.sendQuery(this.insert).flatMap(r => c.sendQuery(BrokenInsert))
}
try {
@@ -77,7 +91,7 @@ class TransactionSpec extends Specification with ConnectionHelper {
val future = pool.inTransaction {
c =>
connection = c
- c.sendQuery(this.brokenInsert)
+ c.sendQuery(BrokenInsert)
}
try {
@@ -97,6 +111,38 @@ class TransactionSpec extends Specification with ConnectionHelper {
}
+ "runs commands for a transaction in a single connection" in {
+
+ val id = UUID.randomUUID().toString
+
+ withPool {
+ pool =>
+ val operations = pool.inTransaction {
+ connection =>
+ connection.sendPreparedStatement(TransactionInsert, List(id)).flatMap {
+ result =>
+ connection.sendPreparedStatement(TransactionInsert, List(id)).map {
+ failure =>
+ List(result, failure)
+ }
+ }
+ }
+
+ Await.ready(operations, Duration(5, TimeUnit.SECONDS))
+
+ operations.value.get match {
+ case Success(e) => failure("should not have executed")
+ case Failure(e) => {
+ e.asInstanceOf[MySQLException].errorMessage.errorCode === 1062
+ executePreparedStatement(pool, "select * from transaction_test where id = ?", id).rows.get.size === 0
+ success("ok")
+ }
+ }
+
+ }
+
+ }
+
}
}
diff --git a/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowEncoderSpec.scala b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/encoder/PreparedStatementExecuteEncoderSpec.scala
similarity index 62%
rename from mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowEncoderSpec.scala
rename to mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/encoder/PreparedStatementExecuteEncoderSpec.scala
index 78bce249..427dde17 100644
--- a/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowEncoderSpec.scala
+++ b/mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/encoder/PreparedStatementExecuteEncoderSpec.scala
@@ -14,28 +14,29 @@
* under the License.
*/
-package com.github.mauricio.async.db.mysql.binary
+package com.github.mauricio.async.db.mysql.encoder
-import org.specs2.mutable.Specification
+import com.github.mauricio.async.db.mysql.binary.BinaryRowEncoder
import io.netty.util.CharsetUtil
+import org.specs2.mutable.Specification
-class BinaryRowEncoderSpec extends Specification {
+class PreparedStatementExecuteEncoderSpec extends Specification {
- val encoder = new BinaryRowEncoder(CharsetUtil.UTF_8)
+ val encoder = new PreparedStatementExecuteEncoder(new BinaryRowEncoder(CharsetUtil.UTF_8))
"binary row encoder" should {
"encode Some(value) like value" in {
- val actual = encoder.encode(List(Some(1l), Some("foo")))
- val expected = encoder.encode(List(1l, "foo"))
+ val actual = encoder.encodeValues(List(Some(1l), Some("foo")), Set(0, 1))
+ val expected = encoder.encodeValues(List(1l, "foo"), Set(0, 1))
actual mustEqual expected
}
"encode None as null" in {
- val actual = encoder.encode(List(None))
- val expected = encoder.encode(List(null))
+ val actual = encoder.encodeValues(List(None), Set(0))
+ val expected = encoder.encodeValues(List(null), Set(0))
actual mustEqual expected
}
diff --git a/postgresql-async/README.md b/postgresql-async/README.md
index 384cb647..7702e907 100644
--- a/postgresql-async/README.md
+++ b/postgresql-async/README.md
@@ -81,6 +81,8 @@ BigInteger | numeric
BigDecimal | numeric
String | varchar
Array[Byte] | bytea (PostgreSQL 9.0 and above only)
+java.nio.ByteBuffer | bytea (PostgreSQL 9.0 and above only)
+io.netty.buffer.ByteBuf | bytea (PostgreSQL 9.0 and above only)
java.util.Date | timestamp_with_timezone
java.sql.Timestamp | timestamp_with_timezone
java.sql.Date | date
diff --git a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala
index 054bab50..8c58076b 100644
--- a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala
+++ b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala
@@ -20,6 +20,7 @@ import com.github.mauricio.async.db.QueryResult
import com.github.mauricio.async.db.column.{ColumnEncoderRegistry, ColumnDecoderRegistry}
import com.github.mauricio.async.db.exceptions.{InsufficientParametersException, ConnectionStillRunningQueryException}
import com.github.mauricio.async.db.general.MutableResultSet
+import com.github.mauricio.async.db.pool.TimeoutScheduler
import com.github.mauricio.async.db.postgresql.codec.{PostgreSQLConnectionDelegate, PostgreSQLConnectionHandler}
import com.github.mauricio.async.db.postgresql.column.{PostgreSQLColumnDecoderRegistry, PostgreSQLColumnEncoderRegistry}
import com.github.mauricio.async.db.postgresql.exceptions._
@@ -45,10 +46,11 @@ class PostgreSQLConnection
encoderRegistry: ColumnEncoderRegistry = PostgreSQLColumnEncoderRegistry.Instance,
decoderRegistry: ColumnDecoderRegistry = PostgreSQLColumnDecoderRegistry.Instance,
group : EventLoopGroup = NettyUtils.DefaultEventLoopGroup,
- executionContext : ExecutionContext = ExecutorServiceUtils.CachedExecutionContext
+ implicit val executionContext : ExecutionContext = ExecutorServiceUtils.CachedExecutionContext
)
extends PostgreSQLConnectionDelegate
- with Connection {
+ with Connection
+ with TimeoutScheduler {
import PostgreSQLConnection._
@@ -63,7 +65,6 @@ class PostgreSQLConnection
private final val currentCount = Counter.incrementAndGet()
private final val preparedStatementsCounter = new AtomicInteger()
- private final implicit val internalExecutionContext = executionContext
private val parameterStatus = new scala.collection.mutable.HashMap[String, String]()
private val parsedStatements = new scala.collection.mutable.HashMap[String, PreparedStatementHolder]()
@@ -80,6 +81,7 @@ class PostgreSQLConnection
private var queryResult: Option[QueryResult] = None
+ override def eventLoopGroup : EventLoopGroup = group
def isReadyForQuery: Boolean = this.queryPromise.isEmpty
def connect: Future[Connection] = {
@@ -91,6 +93,7 @@ class PostgreSQLConnection
}
override def disconnect: Future[Connection] = this.connectionHandler.disconnect.map( c => this )
+ override def onTimeout = disconnect
override def isConnected: Boolean = this.connectionHandler.isConnected
@@ -103,7 +106,7 @@ class PostgreSQLConnection
this.setQueryPromise(promise)
write(new QueryMessage(query))
-
+ addTimeout(promise,configuration.queryTimeout)
promise.future
}
@@ -130,7 +133,7 @@ class PostgreSQLConnection
holder.prepared = true
new PreparedStatementOpeningMessage(holder.statementId, holder.realQuery, values, this.encoderRegistry)
})
-
+ addTimeout(promise,configuration.queryTimeout)
promise.future
}
@@ -303,6 +306,7 @@ class PostgreSQLConnection
private def succeedQueryPromise(result: QueryResult) {
this.queryResult = None
+ this.currentQuery = None
this.clearQueryPromise.foreach {
_.success(result)
}
diff --git a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/ByteArrayEncoderDecoder.scala b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/ByteArrayEncoderDecoder.scala
index bfaed46e..2ae1e7a4 100644
--- a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/ByteArrayEncoderDecoder.scala
+++ b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/ByteArrayEncoderDecoder.scala
@@ -18,7 +18,10 @@ package com.github.mauricio.async.db.postgresql.column
import com.github.mauricio.async.db.column.ColumnEncoderDecoder
import com.github.mauricio.async.db.postgresql.exceptions.ByteArrayFormatNotSupportedException
-import com.github.mauricio.async.db.util.{Log, HexCodec}
+import com.github.mauricio.async.db.util.{ Log, HexCodec }
+import java.nio.ByteBuffer
+
+import io.netty.buffer.ByteBuf
object ByteArrayEncoderDecoder extends ColumnEncoderDecoder {
@@ -31,13 +34,72 @@ object ByteArrayEncoderDecoder extends ColumnEncoderDecoder {
if (value.startsWith(HexStart)) {
HexCodec.decode(value, 2)
} else {
- throw new ByteArrayFormatNotSupportedException()
+ // Default encoding is 'escape'
+
+ // Size the buffer to the length of the string, the data can't be bigger
+ val buffer = ByteBuffer.allocate(value.length)
+
+ val ci = value.iterator
+
+ while (ci.hasNext) {
+ ci.next match {
+ case '\\' ⇒ getCharOrDie(ci) match {
+ case '\\' ⇒ buffer.put('\\'.toByte)
+ case firstDigit ⇒
+ val secondDigit = getCharOrDie(ci)
+ val thirdDigit = getCharOrDie(ci)
+ // Must always be in triplets
+ buffer.put(
+ Integer.decode(
+ new String(Array('0', firstDigit, secondDigit, thirdDigit))).toByte)
+ }
+ case c ⇒ buffer.put(c.toByte)
+ }
+ }
+
+ buffer.flip
+ val finalArray = new Array[Byte](buffer.remaining())
+ buffer.get(finalArray)
+
+ finalArray
}
}
+ /**
+ * This is required since {@link Iterator#next} when {@linke Iterator#hasNext} is false is undefined.
+ * @param ci the iterator source of the data
+ * @return the next character
+ * @throws IllegalArgumentException if there is no next character
+ */
+ private [this] def getCharOrDie(ci: Iterator[Char]): Char = {
+ if (ci.hasNext) {
+ ci.next()
+ } else {
+ throw new IllegalArgumentException("Expected escape sequence character, found nothing")
+ }
+ }
+
override def encode(value: Any): String = {
- HexCodec.encode(value.asInstanceOf[Array[Byte]], HexStartChars)
+ val array = value match {
+ case byteArray: Array[Byte] => byteArray
+
+ case byteBuffer: ByteBuffer if byteBuffer.hasArray => byteBuffer.array()
+
+ case byteBuffer: ByteBuffer =>
+ val arr = new Array[Byte](byteBuffer.remaining())
+ byteBuffer.get(arr)
+ arr
+
+ case byteBuf: ByteBuf if byteBuf.hasArray => byteBuf.array()
+
+ case byteBuf: ByteBuf =>
+ val arr = new Array[Byte](byteBuf.readableBytes())
+ byteBuf.getBytes(0, arr)
+ arr
+ }
+
+ HexCodec.encode(array, HexStartChars)
}
}
diff --git a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/ColumnTypes.scala b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/ColumnTypes.scala
index 7f15b0f6..29c6b736 100644
--- a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/ColumnTypes.scala
+++ b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/ColumnTypes.scala
@@ -63,6 +63,7 @@ object ColumnTypes {
final val MoneyArray = 791
final val NameArray = 1003
+ final val UUID = 2950
final val UUIDArray = 2951
final val XMLArray = 143
diff --git a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/PostgreSQLColumnDecoderRegistry.scala b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/PostgreSQLColumnDecoderRegistry.scala
index 734c0902..606bb442 100644
--- a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/PostgreSQLColumnDecoderRegistry.scala
+++ b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/PostgreSQLColumnDecoderRegistry.scala
@@ -45,6 +45,7 @@ class PostgreSQLColumnDecoderRegistry( charset : Charset = CharsetUtil.UTF_8 ) e
private final val timeArrayDecoder = new ArrayDecoder(TimeEncoderDecoder.Instance)
private final val timeWithTimestampArrayDecoder = new ArrayDecoder(TimeWithTimezoneEncoderDecoder)
private final val intervalArrayDecoder = new ArrayDecoder(PostgreSQLIntervalEncoderDecoder)
+ private final val uuidArrayDecoder = new ArrayDecoder(UUIDEncoderDecoder)
override def decode(kind: ColumnData, value: ByteBuf, charset: Charset): Any = {
decoderFor(kind.dataType).decode(kind, value, charset)
@@ -108,7 +109,8 @@ class PostgreSQLColumnDecoderRegistry( charset : Charset = CharsetUtil.UTF_8 ) e
case MoneyArray => this.stringArrayDecoder
case NameArray => this.stringArrayDecoder
- case UUIDArray => this.stringArrayDecoder
+ case UUID => UUIDEncoderDecoder
+ case UUIDArray => this.uuidArrayDecoder
case XMLArray => this.stringArrayDecoder
case ByteA => ByteArrayEncoderDecoder
diff --git a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/PostgreSQLColumnEncoderRegistry.scala b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/PostgreSQLColumnEncoderRegistry.scala
index e09a2aed..24641336 100644
--- a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/PostgreSQLColumnEncoderRegistry.scala
+++ b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/column/PostgreSQLColumnEncoderRegistry.scala
@@ -16,7 +16,10 @@
package com.github.mauricio.async.db.postgresql.column
+import java.nio.ByteBuffer
+
import com.github.mauricio.async.db.column._
+import io.netty.buffer.ByteBuf
import org.joda.time._
import scala.collection.JavaConversions._
@@ -49,6 +52,8 @@ class PostgreSQLColumnEncoderRegistry extends ColumnEncoderRegistry {
classOf[BigDecimal] -> (BigDecimalEncoderDecoder -> ColumnTypes.Numeric),
classOf[java.math.BigDecimal] -> (BigDecimalEncoderDecoder -> ColumnTypes.Numeric),
+ classOf[java.util.UUID] -> (UUIDEncoderDecoder -> ColumnTypes.UUID),
+
classOf[LocalDate] -> ( DateEncoderDecoder -> ColumnTypes.Date ),
classOf[LocalDateTime] -> (TimestampEncoderDecoder.Instance -> ColumnTypes.Timestamp),
classOf[DateTime] -> (TimestampWithTimezoneEncoderDecoder -> ColumnTypes.TimestampWithTimezone),
@@ -64,7 +69,9 @@ class PostgreSQLColumnEncoderRegistry extends ColumnEncoderRegistry {
classOf[java.sql.Timestamp] -> (TimestampWithTimezoneEncoderDecoder -> ColumnTypes.TimestampWithTimezone),
classOf[java.util.Calendar] -> (TimestampWithTimezoneEncoderDecoder -> ColumnTypes.TimestampWithTimezone),
classOf[java.util.GregorianCalendar] -> (TimestampWithTimezoneEncoderDecoder -> ColumnTypes.TimestampWithTimezone),
- classOf[Array[Byte]] -> ( ByteArrayEncoderDecoder -> ColumnTypes.ByteA )
+ classOf[Array[Byte]] -> ( ByteArrayEncoderDecoder -> ColumnTypes.ByteA ),
+ classOf[ByteBuffer] -> ( ByteArrayEncoderDecoder -> ColumnTypes.ByteA ),
+ classOf[ByteBuf] -> ( ByteArrayEncoderDecoder -> ColumnTypes.ByteA )
)
private final val classesSequence = (classOf[LocalTime] -> (TimeEncoderDecoder.Instance -> ColumnTypes.Time)) ::
@@ -131,7 +138,7 @@ class PostgreSQLColumnEncoderRegistry extends ColumnEncoderRegistry {
"NULL"
} else {
if (this.shouldQuote(item)) {
- "\"" + this.encode(item).replaceAllLiterally("\"", """\"""") + "\""
+ "\"" + this.encode(item).replaceAllLiterally("\\", """\\""").replaceAllLiterally("\"", """\"""") + "\""
} else {
this.encode(item)
}
diff --git a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/pool/PostgreSQLConnectionFactory.scala b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/pool/PostgreSQLConnectionFactory.scala
index 62bcfd1a..ae3c5255 100644
--- a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/pool/PostgreSQLConnectionFactory.scala
+++ b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/pool/PostgreSQLConnectionFactory.scala
@@ -17,6 +17,7 @@
package com.github.mauricio.async.db.postgresql.pool
import com.github.mauricio.async.db.Configuration
+import com.github.mauricio.async.db.exceptions.ConnectionTimeoutedException
import com.github.mauricio.async.db.pool.ObjectFactory
import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.util.Log
@@ -69,6 +70,9 @@ class PostgreSQLConnectionFactory(
def validate( item : PostgreSQLConnection ) : Try[PostgreSQLConnection] = {
Try {
+ if ( item.isTimeouted ) {
+ throw new ConnectionTimeoutedException(item)
+ }
if ( !item.isConnected || item.hasRecentError ) {
throw new ClosedChannelException()
}
diff --git a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/ArrayTypesSpec.scala b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/ArrayTypesSpec.scala
index 7396aeb3..e941e145 100644
--- a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/ArrayTypesSpec.scala
+++ b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/ArrayTypesSpec.scala
@@ -34,7 +34,7 @@ class ArrayTypesSpec extends Specification with DatabaseTestHelper {
(smallint_column, text_column, timestamp_column)
values (
'{1,2,3,4}',
- '{"some,\"comma,separated,text","another line of text",NULL}',
+ '{"some,\"comma,separated,text","another line of text","fake\,backslash","real\\,backslash\\",NULL}',
'{"2013-04-06 01:15:10.528-03","2013-04-06 01:15:08.528-03"}'
)"""
@@ -52,7 +52,7 @@ class ArrayTypesSpec extends Specification with DatabaseTestHelper {
executeDdl(handler, insert, 1)
val result = executeQuery(handler, "select * from type_test_table").rows.get
result(0)("smallint_column") === List(1,2,3,4)
- result(0)("text_column") === List("some,\"comma,separated,text", "another line of text", null )
+ result(0)("text_column") === List("some,\"comma,separated,text", "another line of text", "fake,backslash", "real\\,backslash\\", null )
result(0)("timestamp_column") === List(
TimestampWithTimezoneEncoderDecoder.decode("2013-04-06 01:15:10.528-03"),
TimestampWithTimezoneEncoderDecoder.decode("2013-04-06 01:15:08.528-03")
@@ -68,7 +68,7 @@ class ArrayTypesSpec extends Specification with DatabaseTestHelper {
TimestampWithTimezoneEncoderDecoder.decode("2013-04-06 01:15:08.528-03")
)
val numbers = List(1,2,3,4)
- val texts = List("some,\"comma,separated,text", "another line of text", null )
+ val texts = List("some,\"comma,separated,text", "another line of text", "fake,backslash", "real\\,backslash\\", null )
withHandler {
handler =>
diff --git a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnectionSpec.scala b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnectionSpec.scala
index 93244111..2843e95e 100644
--- a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnectionSpec.scala
+++ b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnectionSpec.scala
@@ -14,20 +14,22 @@
* under the License.
*/
-package com.github.mauricio.postgresql
+package com.github.mauricio.async.db.postgresql
-import com.github.mauricio.async.db.column.{TimestampEncoderDecoder, TimeEncoderDecoder, DateEncoderDecoder}
+import java.nio.ByteBuffer
+
+import com.github.mauricio.async.db.column.{DateEncoderDecoder, TimeEncoderDecoder, TimestampEncoderDecoder}
import com.github.mauricio.async.db.exceptions.UnsupportedAuthenticationMethodException
-import com.github.mauricio.async.db.postgresql.exceptions.{QueryMustNotBeNullOrEmptyException, GenericDatabaseException}
+import com.github.mauricio.async.db.postgresql.exceptions.{GenericDatabaseException, QueryMustNotBeNullOrEmptyException}
import com.github.mauricio.async.db.postgresql.messages.backend.InformationMessage
-import com.github.mauricio.async.db.postgresql.{PostgreSQLConnection, DatabaseTestHelper}
import com.github.mauricio.async.db.util.Log
-import com.github.mauricio.async.db.{Configuration, QueryResult, Connection}
-import concurrent.{Future, Await}
+import com.github.mauricio.async.db.{Configuration, Connection, QueryResult}
+import io.netty.buffer.Unpooled
+import org.joda.time.LocalDateTime
import org.specs2.mutable.Specification
-import scala.concurrent.ExecutionContext.Implicits.global
+
import scala.concurrent.duration._
-import org.joda.time.LocalDateTime
+import scala.concurrent.{Await, Future}
object PostgreSQLConnectionSpec {
val log = Log.get[PostgreSQLConnectionSpec]
@@ -282,16 +284,12 @@ class PostgreSQLConnectionSpec extends Specification with DatabaseTestHelper {
try {
withHandler(configuration, {
handler =>
- executeQuery(handler, "SELECT 0")
- throw new IllegalStateException("should not have come here")
+ val result = executeQuery(handler, "SELECT 0")
+ throw new IllegalStateException("should not have arrived")
})
} catch {
- case e: GenericDatabaseException => {
+ case e: GenericDatabaseException =>
e.errorMessage.fields(InformationMessage.Routine) === "auth_failed"
- }
- case e: Exception => {
- throw new IllegalStateException("should not have come here")
- }
}
}
@@ -406,10 +404,14 @@ class PostgreSQLConnectionSpec extends Specification with DatabaseTestHelper {
executeDdl(handler, create)
log.debug("executed create")
executePreparedStatement(handler, insert, Array( sampleArray ))
+ executePreparedStatement(handler, insert, Array( ByteBuffer.wrap(sampleArray) ))
+ executePreparedStatement(handler, insert, Array( Unpooled.copiedBuffer(sampleArray) ))
log.debug("executed prepared statement")
val rows = executeQuery(handler, select).rows.get
rows(0)("content").asInstanceOf[Array[Byte]] === sampleArray
+ rows(1)("content").asInstanceOf[Array[Byte]] === sampleArray
+ rows(2)("content").asInstanceOf[Array[Byte]] === sampleArray
}
}
@@ -428,6 +430,20 @@ class PostgreSQLConnectionSpec extends Specification with DatabaseTestHelper {
}
+ "insert without return after select" in {
+
+ withHandler {
+ handler =>
+ executeDdl(handler, this.preparedStatementCreate)
+ executeDdl(handler, this.preparedStatementInsert, 1)
+ executeDdl(handler, this.preparedStatementSelect, 1)
+ val result = executeQuery(handler, this.preparedStatementInsert2)
+
+ result.rows === None
+ }
+
+ }
+
}
}
diff --git a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/PreparedStatementSpec.scala b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/PreparedStatementSpec.scala
index 20c645cc..6fd7d9a6 100644
--- a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/PreparedStatementSpec.scala
+++ b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/PreparedStatementSpec.scala
@@ -20,7 +20,7 @@ import org.specs2.mutable.Specification
import org.joda.time.LocalDate
import com.github.mauricio.async.db.util.Log
import com.github.mauricio.async.db.exceptions.InsufficientParametersException
-import java.util.Date
+import java.util.UUID
import com.github.mauricio.async.db.postgresql.exceptions.GenericDatabaseException
class PreparedStatementSpec extends Specification with DatabaseTestHelper {
@@ -282,6 +282,61 @@ class PreparedStatementSpec extends Specification with DatabaseTestHelper {
}
}
+ "support UUID" in {
+ if ( System.getenv("TRAVIS") == null ) {
+ withHandler {
+ handler =>
+ val create = """create temp table uuids
+ |(
+ |id bigserial primary key,
+ |my_id uuid
+ |);""".stripMargin
+
+ val insert = "INSERT INTO uuids (my_id) VALUES (?) RETURNING id"
+ val select = "SELECT * FROM uuids"
+
+ val uuid = UUID.randomUUID()
+
+ executeDdl(handler, create)
+ executePreparedStatement(handler, insert, Array(uuid) )
+ val result = executePreparedStatement(handler, select).rows.get
+
+ result(0)("my_id").asInstanceOf[UUID] === uuid
+ }
+ success
+ } else {
+ pending
+ }
+ }
+
+ "support UUID array" in {
+ if ( System.getenv("TRAVIS") == null ) {
+ withHandler {
+ handler =>
+ val create = """create temp table uuids
+ |(
+ |id bigserial primary key,
+ |my_id uuid[]
+ |);""".stripMargin
+
+ val insert = "INSERT INTO uuids (my_id) VALUES (?) RETURNING id"
+ val select = "SELECT * FROM uuids"
+
+ val uuid1 = UUID.randomUUID()
+ val uuid2 = UUID.randomUUID()
+
+ executeDdl(handler, create)
+ executePreparedStatement(handler, insert, Array(Array(uuid1, uuid2)) )
+ val result = executePreparedStatement(handler, select).rows.get
+
+ result(0)("my_id").asInstanceOf[Seq[UUID]] === Seq(uuid1, uuid2)
+ }
+ success
+ } else {
+ pending
+ }
+ }
+
}
}
diff --git a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/TimeAndDateSpec.scala b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/TimeAndDateSpec.scala
index e671a5b4..67e7b877 100644
--- a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/TimeAndDateSpec.scala
+++ b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/TimeAndDateSpec.scala
@@ -35,7 +35,7 @@ class TimeAndDateSpec extends Specification with DatabaseTestHelper {
)"""
executeDdl(handler, create)
- executeQuery(handler, "INSERT INTO messages (moment) VALUES ('04:05:06')")
+ executePreparedStatement(handler, "INSERT INTO messages (moment) VALUES (?)", Array[Any](new LocalTime(4, 5, 6)))
val rows = executePreparedStatement(handler, "select * from messages").rows.get
@@ -60,7 +60,7 @@ class TimeAndDateSpec extends Specification with DatabaseTestHelper {
)"""
executeDdl(handler, create)
- executeQuery(handler, "INSERT INTO messages (moment) VALUES ('04:05:06.134')")
+ executePreparedStatement(handler, "INSERT INTO messages (moment) VALUES (?)", Array[Any](new LocalTime(4, 5, 6, 134)))
val rows = executePreparedStatement(handler, "select * from messages").rows.get
@@ -200,6 +200,22 @@ class TimeAndDateSpec extends Specification with DatabaseTestHelper {
}
+ "handle sending a LocalDateTime and return a LocalDateTime for a timestamp without timezone column" in {
+
+ withTimeHandler {
+ conn =>
+ val date1 = new LocalDateTime(2190319)
+
+ await(conn.sendPreparedStatement("CREATE TEMP TABLE TEST(T TIMESTAMP)"))
+ await(conn.sendPreparedStatement("INSERT INTO TEST(T) VALUES(?)", Seq(date1)))
+ val result = await(conn.sendPreparedStatement("SELECT T FROM TEST"))
+ val date2 = result.rows.get.head(0)
+
+ date2 === date1
+ }
+
+ }
+
"handle sending a date with timezone and retrieving the date with the same time zone" in {
withTimeHandler {
diff --git a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/column/ByteArrayDecoderSpec.scala b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/column/ByteArrayDecoderSpec.scala
new file mode 100644
index 00000000..328e872f
--- /dev/null
+++ b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/column/ByteArrayDecoderSpec.scala
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2013 Maurício Linhares
+ *
+ * Maurício Linhares licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.github.mauricio.async.db.postgresql.column
+
+import org.specs2.mutable.Specification
+
+class ByteArrayDecoderSpec extends Specification {
+
+ val escapeTestData =
+ """\000\001\002\003\004\005\006\007\010\011\012\013\014\015\016\017\020\021\022\023\024\025\026\027""" +
+ """\030\031\032\033\034\035\036\037 !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^""" +
+ """_`abcdefghijklmnopqrstuvwxyz{|}~\177\200\201\202\203\204\205\206\207\210\211\212\213\214\215\216""" +
+ """\217\220\221\222\223\224\225\226\227\230\231\232\233\234\235\236\237\240\241\242\243\244\245\246""" +
+ """\247\250\251\252\253\254\255\256\257\260\261\262\263\264\265\266\267\270\271\272\273\274\275\276""" +
+ """\277\300\301\302\303\304\305\306\307\310\311\312\313\314\315\316\317\320\321\322\323\324\325\326""" +
+ """\327\330\331\332\333\334\335\336\337\340\341\342\343\344\345\346\347\350\351\352\353\354\355\356""" +
+ """\357\360\361\362\363\364\365\366\367\370\371\372\373\374\375\376\377\377\376\375\374\373\372\371""" +
+ """\370\367\366\365\364\363\362\361\360\357\356\355\354\353\352\351\350\347\346\345\344\343\342\341""" +
+ """\340\337\336\335\334\333\332\331\330\327\326\325\324\323\322\321\320\317\316\315\314\313\312\311""" +
+ """\310\307\306\305\304\303\302\301\300\277\276\275\274\273\272\271\270\267\266\265\264\263\262\261""" +
+ """\260\257\256\255\254\253\252\251\250\247\246\245\244\243\242\241\240\237\236\235\234\233\232\231""" +
+ """\230\227\226\225\224\223\222\221\220\217\216\215\214\213\212\211\210\207\206\205\204\203\202\201""" +
+ """\200\177~}|{zyxwvutsrqponmlkjihgfedcba`_^]\\[ZYXWVUTSRQPONMLKJIHGFEDCBA@?>=<;:9876543210/.-,+*)(""" +
+ """'&%$#"! \037\036\035\034\033\032\031\030\027\026\025\024\023\022\021\020\017\016\015\014\013\012""" +
+ """\011\010\007\006\005\004\003\002\001\000"""
+
+ val hexTestData =
+ """\x000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e""" +
+ """2f303132333435363738393a3b3c3d3e3f404142434445464748494a4b4c4d4e4f505152535455565758595a5b5c5d5e""" +
+ """5f606162636465666768696a6b6c6d6e6f707172737475767778797a7b7c7d7e7f808182838485868788898a8b8c8d8e""" +
+ """8f909192939495969798999a9b9c9d9e9fa0a1a2a3a4a5a6a7a8a9aaabacadaeafb0b1b2b3b4b5b6b7b8b9babbbcbdbe""" +
+ """bfc0c1c2c3c4c5c6c7c8c9cacbcccdcecfd0d1d2d3d4d5d6d7d8d9dadbdcdddedfe0e1e2e3e4e5e6e7e8e9eaebecedee""" +
+ """eff0f1f2f3f4f5f6f7f8f9fafbfcfdfefffffefdfcfbfaf9f8f7f6f5f4f3f2f1f0efeeedecebeae9e8e7e6e5e4e3e2e1""" +
+ """e0dfdedddcdbdad9d8d7d6d5d4d3d2d1d0cfcecdcccbcac9c8c7c6c5c4c3c2c1c0bfbebdbcbbbab9b8b7b6b5b4b3b2b1""" +
+ """b0afaeadacabaaa9a8a7a6a5a4a3a2a1a09f9e9d9c9b9a999897969594939291908f8e8d8c8b8a898887868584838281""" +
+ """807f7e7d7c7b7a797877767574737271706f6e6d6c6b6a696867666564636261605f5e5d5c5b5a595857565554535251""" +
+ """504f4e4d4c4b4a494847464544434241403f3e3d3c3b3a393837363534333231302f2e2d2c2b2a292827262524232221""" +
+ """201f1e1d1c1b1a191817161514131211100f0e0d0c0b0a09080706050403020100"""
+
+ val originalData = ((0 to 255) ++ ((0 to 255).reverse)).map(_.toByte).toArray
+
+ "decoder" should {
+
+ "parse escape data" in {
+ ByteArrayEncoderDecoder.decode(escapeTestData) === originalData
+ }
+
+ "parse hex data" in {
+ ByteArrayEncoderDecoder.decode(hexTestData) === originalData
+ }
+ }
+
+}
diff --git a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/column/DefaultColumnEncoderRegistrySpec.scala b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/column/DefaultColumnEncoderRegistrySpec.scala
index 88965d49..1b41f447 100644
--- a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/column/DefaultColumnEncoderRegistrySpec.scala
+++ b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/column/DefaultColumnEncoderRegistrySpec.scala
@@ -26,7 +26,7 @@ class DefaultColumnEncoderRegistrySpec extends Specification {
"correctly render an array of strings with nulls" in {
val items = Array( "some", """text \ hoes " here to be seen""", null, "all, right" )
- registry.encode( items ) === """{"some","text \ hoes \" here to be seen",NULL,"all, right"}"""
+ registry.encode( items ) === """{"some","text \\ hoes \" here to be seen",NULL,"all, right"}"""
}
"correctly render an array of numbers" in {
diff --git a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/pool/ConnectionPoolSpec.scala b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/pool/ConnectionPoolSpec.scala
index 02295b16..b71ebe65 100644
--- a/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/pool/ConnectionPoolSpec.scala
+++ b/postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/pool/ConnectionPoolSpec.scala
@@ -16,12 +16,21 @@
package com.github.mauricio.async.db.postgresql.pool
+import java.util.UUID
+
import com.github.mauricio.async.db.pool.{ConnectionPool, PoolConfiguration}
+import com.github.mauricio.async.db.postgresql.exceptions.GenericDatabaseException
import com.github.mauricio.async.db.postgresql.{PostgreSQLConnection, DatabaseTestHelper}
import org.specs2.mutable.Specification
+object ConnectionPoolSpec {
+ val Insert = "insert into transaction_test (id) values (?)"
+}
+
class ConnectionPoolSpec extends Specification with DatabaseTestHelper {
+ import ConnectionPoolSpec.Insert
+
"pool" should {
"give you a connection when sending statements" in {
@@ -51,6 +60,29 @@ class ConnectionPoolSpec extends Specification with DatabaseTestHelper {
}
}
+ "runs commands for a transaction in a single connection" in {
+
+ val id = UUID.randomUUID().toString
+
+ withPool {
+ pool =>
+ val operations = pool.inTransaction {
+ connection =>
+ connection.sendPreparedStatement(Insert, List(id)).flatMap {
+ result =>
+ connection.sendPreparedStatement(Insert, List(id)).map {
+ failure =>
+ List(result, failure)
+ }
+ }
+ }
+
+ await(operations) must throwA[GenericDatabaseException]
+
+ }
+
+ }
+
}
def withPool[R]( fn : (ConnectionPool[PostgreSQLConnection]) => R ) : R = {
diff --git a/project/Build.scala b/project/Build.scala
index c2138638..bb361625 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -45,18 +45,18 @@ object ProjectBuild extends Build {
object Configuration {
- val commonVersion = "0.2.15-SNAPSHOT"
- val projectScalaVersion = "2.11.0"
+ val commonVersion = "0.2.19-SNAPSHOT"
+ val projectScalaVersion = "2.11.7"
val specs2Dependency = "org.specs2" %% "specs2" % "2.3.11" % "test"
- val logbackDependency = "ch.qos.logback" % "logback-classic" % "1.0.13" % "test"
+ val logbackDependency = "ch.qos.logback" % "logback-classic" % "1.1.3" % "test"
val commonDependencies = Seq(
- "org.slf4j" % "slf4j-api" % "1.7.5",
+ "org.slf4j" % "slf4j-api" % "1.7.12",
"joda-time" % "joda-time" % "2.3",
"org.joda" % "joda-convert" % "1.5",
- "io.netty" % "netty-all" % "4.0.23.Final",
- "org.javassist" % "javassist" % "3.18.1-GA",
+ "io.netty" % "netty-all" % "4.0.29.Final",
+ "org.javassist" % "javassist" % "3.20.0-GA",
specs2Dependency,
logbackDependency
)
diff --git a/project/build.properties b/project/build.properties
index 8ac605a3..d638b4f3 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -1 +1 @@
-sbt.version=0.13.2
+sbt.version = 0.13.8
\ No newline at end of file
diff --git a/script/prepare_build.sh b/script/prepare_build.sh
index 9992e442..96aa8345 100755
--- a/script/prepare_build.sh
+++ b/script/prepare_build.sh
@@ -2,19 +2,22 @@
echo "Preparing MySQL configs"
mysql -u root -e 'create database mysql_async_tests;'
+mysql -u root -e "create table mysql_async_tests.transaction_test (id varchar(255) not null, primary key (id))"
mysql -u root -e "GRANT ALL PRIVILEGES ON *.* TO 'mysql_async'@'localhost' IDENTIFIED BY 'root' WITH GRANT OPTION";
mysql -u root -e "GRANT ALL PRIVILEGES ON *.* TO 'mysql_async_old'@'localhost' WITH GRANT OPTION";
mysql -u root -e "UPDATE mysql.user SET Password = OLD_PASSWORD('do_not_use_this'), plugin = 'mysql_old_password' where User = 'mysql_async_old'; flush privileges;";
+mysql -u root -e "GRANT ALL PRIVILEGES ON *.* TO 'mysql_async_nopw'@'localhost' WITH GRANT OPTION";
echo "preparing postgresql configs"
psql -c 'create database netty_driver_test;' -U postgres
psql -c 'create database netty_driver_time_test;' -U postgres
psql -c "alter database netty_driver_time_test set timezone to 'GMT'" -U postgres
+psql -c "create table transaction_test ( id varchar(255) not null, constraint id_unique primary key (id))" -U postgres netty_driver_test
psql -c "CREATE USER postgres_md5 WITH PASSWORD 'postgres_md5'; GRANT ALL PRIVILEGES ON DATABASE netty_driver_test to postgres_md5;" -U postgres
psql -c "CREATE USER postgres_cleartext WITH PASSWORD 'postgres_cleartext'; GRANT ALL PRIVILEGES ON DATABASE netty_driver_test to postgres_cleartext;" -U postgres
psql -c "CREATE USER postgres_kerberos WITH PASSWORD 'postgres_kerberos'; GRANT ALL PRIVILEGES ON DATABASE netty_driver_test to postgres_kerberos;" -U postgres
-psql -d "netty_driver_test" -c "CREATE TYPE example_mood AS ENUM ('sad', 'ok', 'happy');"
+psql -d "netty_driver_test" -c "CREATE TYPE example_mood AS ENUM ('sad', 'ok', 'happy');" -U postgres
sudo chmod 777 /etc/postgresql/9.1/main/pg_hba.conf