Skip to content

Commit

Permalink
Merge pull request #30 from driftluo/dump-to-0.3.5
Browse files Browse the repository at this point in the history
Dump to 0.3.5
  • Loading branch information
driftluo authored Sep 20, 2018
2 parents 43c3be9 + a1e6bf0 commit d5172c6
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 108 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "influx_db_client"
version = "0.3.4"
version = "0.3.5"
authors = ["piaoliu <[email protected]>"]
documentation = "https://docs.rs/influx_db_client/"
repository = "https://github.com/driftluo/InfluxDBClient-rs"
Expand Down
9 changes: 3 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@ This project has been able to run properly, PR is welcome.

## Usage

### Recommend
### Use

```
[dependencies]
influx_db_client = "^0.3.4"
[patch.crates-io]
influx_db_client = { git = 'https://github.com/driftluo/InfluxDBClient-rs' }
influx_db_client = "^0.3.5"
```

### http
Expand Down Expand Up @@ -88,7 +85,7 @@ fn main() {

This is the [API Document](https://docs.influxdata.com/influxdb/v1.2/tools/api/), it may apply to version 1.0 or higher.

I have tested it in version 1.0.2 and 1.3.5.
I have tested it in version 1.0.2/1.3.5/1.5.

## Thanks

Expand Down
68 changes: 36 additions & 32 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use serde_json;
use serde_json::de::IoRead as SerdeIoRead;
use hyper::client::Client as hyper_client;
use hyper::client::RequestBuilder;
use hyper::client::Response;
use hyper::net::HttpsConnector;
use hyper::client::RequestBuilder;
use hyper::Url;
use hyper_native_tls::native_tls::TlsConnector;
use hyper_native_tls::NativeTlsClient;
use hyper::Url;
use serde_json;
use serde_json::de::IoRead as SerdeIoRead;
use std::io::Read;
use std::time::Duration;
use std::net::UdpSocket;
use std::iter::FromIterator;
use std::net::UdpSocket;
use std::net::{SocketAddr, ToSocketAddrs};
use {error, serialization, Node, Point, Points, Precision, Query, ChunkedQuery};
use std::time::Duration;
use {error, serialization, ChunkedQuery, Node, Point, Points, Precision, Query};

/// The client to influxdb
#[derive(Debug)]
Expand Down Expand Up @@ -137,10 +137,9 @@ impl Client {
None => param.push(("precision", "s")),
};

match rp {
Some(t) => param.push(("rp", t)),
None => (),
};
if let Some(t) = rp {
param.push(("rp", t))
}

let url = self.build_url("write", Some(param));

Expand All @@ -150,12 +149,12 @@ impl Client {

match res.status_raw().0 {
204 => Ok(()),
400 => Err(error::Error::SyntaxError(serialization::conversion(err))),
400 => Err(error::Error::SyntaxError(serialization::conversion(&err))),
401 | 403 => Err(error::Error::InvalidCredentials(
"Invalid authentication credentials.".to_string(),
)),
404 => Err(error::Error::DataBaseDoesNotExist(
serialization::conversion(err),
serialization::conversion(&err),
)),
500 => Err(error::Error::RetentionPolicyDoesNotExist(err)),
_ => Err(error::Error::Unknow("There is something wrong".to_string())),
Expand All @@ -180,7 +179,7 @@ impl Client {
q: &str,
epoch: Option<Precision>,
) -> Result<ChunkedQuery<SerdeIoRead<Response>>, error::Error> {
return self.query_raw_chunked(q, epoch);
self.query_raw_chunked(q, epoch)
}

/// Drop measurement
Expand Down Expand Up @@ -405,12 +404,16 @@ impl Client {
}
}

fn send_request(&self, q: &str, epoch: Option<Precision>, chunked: bool) -> Result<Response, error::Error> {
fn send_request(
&self,
q: &str,
epoch: Option<Precision>,
chunked: bool,
) -> Result<Response, error::Error> {
let mut param = vec![("db", self.db.as_str()), ("q", q)];

match epoch {
Some(ref t) => param.push(("epoch", t.to_str())),
None => (),
if let Some(ref t) = epoch {
param.push(("epoch", t.to_str()))
}

if chunked {
Expand All @@ -423,9 +426,9 @@ impl Client {
let mut res = {
if q_lower.starts_with("select") && !q_lower.contains("into")
|| q_lower.starts_with("show")
{
self.client.get(url).send()?
} else {
{
self.client.get(url).send()?
} else {
self.client.post(url).send()?
}
};
Expand All @@ -438,9 +441,9 @@ impl Client {
let json_data: Query = serde_json::from_str(&context).unwrap();

Err(error::Error::SyntaxError(serialization::conversion(
json_data.error.unwrap(),
&json_data.error.unwrap(),
)))
},
}
401 | 403 => Err(error::Error::InvalidCredentials(
"Invalid authentication credentials.".to_string(),
)),
Expand All @@ -456,14 +459,18 @@ impl Client {
let _ = response.read_to_string(&mut context);

let json_data: Query = serde_json::from_str(&context).unwrap();
return Ok(json_data);
Ok(json_data)
}

/// Query and return to the native json structure
fn query_raw_chunked(&self, q: &str, epoch: Option<Precision>) -> Result<ChunkedQuery<SerdeIoRead<Response>>, error::Error> {
fn query_raw_chunked(
&self,
q: &str,
epoch: Option<Precision>,
) -> Result<ChunkedQuery<SerdeIoRead<Response>>, error::Error> {
let response = self.send_request(q, epoch, true)?;
let stream = serde_json::Deserializer::from_reader(response).into_iter::<Query>();
return Ok(stream);
Ok(stream)
}

/// Constructs the full URL for an API call.
Expand All @@ -472,12 +479,9 @@ impl Client {

let mut authentication = Vec::new();

match self.authentication {
Some(ref t) => {
authentication.push(("u", &t.0));
authentication.push(("p", &t.1));
}
None => {}
if let Some(ref t) = self.authentication {
authentication.push(("u", &t.0));
authentication.push(("p", &t.1));
}

let url = Url::parse_with_params(url.as_str(), authentication).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;
use std::io;
use hyper;
use std::error::Error as StdError;
use std::fmt;
use std::io;

/// The error of influxdb client
#[derive(Debug, Deserialize, Serialize)]
Expand Down
44 changes: 19 additions & 25 deletions src/keys.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde_json;
use std::collections::HashMap;
use std::iter::FromIterator;
use serde_json;
use std::iter::Iterator;

/// Influxdb value, Please look at [this address](https://docs.influxdata.com/influxdb/v1.3/write_protocols/line_protocol_reference/)
Expand Down Expand Up @@ -142,7 +142,7 @@ pub struct Series {
}

/// Time accuracy
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum Precision {
/// n
Nanoseconds,
Expand Down Expand Up @@ -187,29 +187,23 @@ macro_rules! points {
/// Create Point by macro
#[macro_export]
macro_rules! point {
($x:expr) => {
{
Point::new($x)
}
};
($x:expr, $y:expr, $z:expr) => {
{
Point {
measurement: String::from($x),
tags: $y,
fields: $z,
timestamp: None
}
($x:expr) => {{
Point::new($x)
}};
($x:expr, $y:expr, $z:expr) => {{
Point {
measurement: String::from($x),
tags: $y,
fields: $z,
timestamp: None,
}
};
($x:expr, $y:expr, $z:expr, $a:expr) => {
{
Point {
measurement: String::from($x),
tags: $y,
fields: $z,
timestamp: Some($a)
}
}};
($x:expr, $y:expr, $z:expr, $a:expr) => {{
Point {
measurement: String::from($x),
tags: $y,
fields: $z,
timestamp: Some($a),
}
};
}};
}
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ extern crate serde;
extern crate serde_derive;
extern crate serde_json;

/// Serialization module
pub mod serialization;
/// Error module
pub mod error;
/// All API on influxdb client, Including udp, http
pub mod client;
/// Error module
pub mod error;
/// Points and Query Data Deserialize
pub mod keys;
/// Serialization module
pub mod serialization;

pub use client::{Client, TLSOption, UdpClient};
pub use keys::{Node, Point, Points, Precision, Query, ChunkedQuery, Series, Value};
pub use error::Error;
pub use keys::{ChunkedQuery, Node, Point, Points, Precision, Query, Series, Value};
43 changes: 17 additions & 26 deletions src/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use {Point, Value};
pub(crate) fn line_serialization<T: Iterator<Item = Point>>(points: T) -> String {
let mut line = Vec::new();
for point in points {
line.push(escape_measurement(point.measurement));
line.push(escape_measurement(&point.measurement));

for (tag, value) in point.tags.into_iter() {
for (tag, value) in point.tags {
line.push(",".to_string());
line.push(escape_keys_and_tags(tag.to_string()));
line.push(escape_keys_and_tags(&tag));
line.push("=".to_string());

match value {
Value::String(s) => line.push(escape_keys_and_tags(s.to_string())),
Value::String(s) => line.push(escape_keys_and_tags(&s)),
Value::Float(f) => line.push(f.to_string()),
Value::Integer(i) => line.push(i.to_string() + "i"),
Value::Boolean(b) => line.push({
Expand All @@ -27,7 +27,7 @@ pub(crate) fn line_serialization<T: Iterator<Item = Point>>(points: T) -> String

let mut was_first = true;

for (field, value) in point.fields.into_iter() {
for (field, value) in point.fields {
line.push(
{
if was_first {
Expand All @@ -38,12 +38,12 @@ pub(crate) fn line_serialization<T: Iterator<Item = Point>>(points: T) -> String
}
}.to_string(),
);
line.push(escape_keys_and_tags(field.to_string()));
line.push(escape_keys_and_tags(&field));
line.push("=".to_string());

match value {
Value::String(s) => line.push(escape_string_field_value(
s.to_string().replace("\\\"", "\\\\\""),
&s.replace("\\\"", "\\\\\""),
)),
Value::Float(f) => line.push(f.to_string()),
Value::Integer(i) => line.push(i.to_string() + "i"),
Expand All @@ -57,12 +57,9 @@ pub(crate) fn line_serialization<T: Iterator<Item = Point>>(points: T) -> String
}
}

match point.timestamp {
Some(t) => {
line.push(" ".to_string());
line.push(t.to_string());
}
_ => {}
if let Some(t) = point.timestamp {
line.push(" ".to_string());
line.push(t.to_string());
}

line.push("\n".to_string())
Expand All @@ -88,7 +85,7 @@ pub(crate) fn quote_literal(value: &str) -> String {
}

#[inline]
pub(crate) fn conversion(value: String) -> String {
pub(crate) fn conversion(value: &str) -> String {
value
.replace("\'", "")
.replace("\"", "")
Expand All @@ -98,20 +95,20 @@ pub(crate) fn conversion(value: String) -> String {
}

#[inline]
fn escape_keys_and_tags(value: String) -> String {
fn escape_keys_and_tags(value: &str) -> String {
value
.replace(",", "\\,")
.replace("=", "\\=")
.replace(" ", "\\ ")
}

#[inline]
fn escape_measurement(value: String) -> String {
fn escape_measurement(value: &str) -> String {
value.replace(",", "\\,").replace(" ", "\\ ")
}

#[inline]
fn escape_string_field_value(value: String) -> String {
fn escape_string_field_value(value: &str) -> String {
format!("\"{}\"", value.replace("\"", "\\\""))
}

Expand All @@ -136,25 +133,19 @@ mod test {
#[test]
fn escape_keys_and_tags_test() {
assert_eq!(
escape_keys_and_tags(String::from("foo, hello=world")),
escape_keys_and_tags("foo, hello=world"),
"foo\\,\\ hello\\=world"
)
}

#[test]
fn escape_measurement_test() {
assert_eq!(
escape_measurement(String::from("foo, hello")),
"foo\\,\\ hello"
)
assert_eq!(escape_measurement("foo, hello"), "foo\\,\\ hello")
}

#[test]
fn escape_string_field_value_test() {
assert_eq!(
escape_string_field_value(String::from("\"foo")),
"\"\\\"foo\""
)
assert_eq!(escape_string_field_value("\"foo"), "\"\\\"foo\"")
}

#[test]
Expand Down
Loading

0 comments on commit d5172c6

Please sign in to comment.