Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Port everything to asyncio (#14)
Browse files Browse the repository at this point in the history
* bump everything on the planet

* bumpre more crap

* general progress

* port this file

* port another module

* cargo fmt

* pretty sure this must be a result

* cargo fmt

* I think these need to be results

* bring these back

* progress

* type fixes

* more progress

* cargo fmt

* cargo update futures-await

* Try this...

* progress...

* uptake ring/rustls and friends

* attempted travis fixes

* dep bump

* the lib portion now compiles!

* switch from Box to references with lifetimes

* remove more box

* cargo fmt

* forward progress on compiling

* more progress

* more

* enough progress to get more errors

* Port submit

* Progress

* Comment this out so that we can make progress on compilation

* knock out a TODO

* this looks way saner

* bump future-await

* progress

* oops, this needs a fucntion

* format this differently

* more progress

* fixes

* TODO

* fix a warning with this silliness

* this isnt for anything

* fix

* less code

* reorganize code

* move this around

* cargo update

* cargo fmt

* cargo update

* cargo update

* cargo update

* some comments, and a cleanup

* fix

* some progress on these errors

* one left!

* IT COMPILES!

* cargo fmt

* fix submission

* style

* port the whole check to be stream based

* Parallelism!

* there's a pool by default

* bump serde

* bumped futures-await

* cargo update gcc

* cargo update

* cargo update futures

* cargo update

* cargo update serde

* cargo update

* cargo update libc

* cargo update

* cargo update

* This got fixed apparently

* clippy fixes (requires the very latest nightly)

* more clippy fixes!

* Do what clippy tells me

* Do what clippy tells me

* more aggressive fmting

* Fixed #15 -- added a --all-logs instead of --log-urls

* Added a timeout to submits

* obey clippy

* Drop tokio-proto and do everything myself... seems to work

* Don't double borrow

* cargo update

* Simplify

* Simplify!

* Order things in check, and don't print the sct in submit

* Fixed formatting certs
  • Loading branch information
alex authored Sep 23, 2017
1 parent bc98b0a commit ae951b6
Show file tree
Hide file tree
Showing 10 changed files with 930 additions and 373 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
sudo: false
dist: trusty
language: rust
cache: cargo
rust:
- stable
- nightly
before_script:
- export PATH=$HOME/.cargo/bin:$PATH
Expand Down
634 changes: 511 additions & 123 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 13 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ version = "0.1.0"
authors = ["Alex Gaynor <[email protected]>"]

[dependencies]
futures = "*"
futures-await = { git = "https://github.com/alexcrichton/futures-await" }
tokio-core = "*"
tokio-io = "*"
tokio-process = "*"
tokio-rustls = "*"
tokio-service = "*"
net2 = "*"

acme-client = "*"

base64 = "*"
Expand All @@ -16,16 +25,14 @@ clap = "*"

hex = "*"

hyper = "< 0.11"
hyper-rustls = ">=0.6"
hyper = ">=0.11"
hyper-rustls = ">=0.11"

openssl = "*"

rayon = "*"

ring = "*"
ring = ">=0.12"

rustls = { version = ">=0.8", features = ["dangerous_configuration"] }
rustls = { version = ">=0.11", features = ["dangerous_configuration"] }

serde = "*"
serde_derive = "*"
Expand Down
3 changes: 2 additions & 1 deletion rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
reorder_imports = true
reorder_imported_names = true
reorder_imports_in_group = true
normalize_imports = true
normalize_comments = false
normalize_comments = true
87 changes: 56 additions & 31 deletions src/crtsh.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,73 @@
use super::common::sha256_hex;
use super::ct::AddChainRequest;
use base64;
use futures::prelude::*;
use hyper;
use serde_json;
use url;

pub fn build_chain_for_cert(http_client: &hyper::Client, cert: &[u8]) -> Option<Vec<Vec<u8>>> {
pub fn build_chain_for_cert<C: hyper::client::Connect>(
http_client: &hyper::Client<C>,
cert: &[u8],
) -> impl Future<Item = Vec<Vec<u8>>, Error = ()> {
let body = url::form_urlencoded::Serializer::new(String::new())
.append_pair("b64cert", &base64::encode(cert))
.append_pair("b64cert", &base64::encode(&cert))
.append_pair("onlyonechain", "Y")
.finish();
let body_bytes = body.as_bytes();
let response = match http_client
.post("https://crt.sh/gen-add-chain")
.header(hyper::header::ContentType::form_url_encoded())
.header(hyper::header::Connection::keep_alive())
.body(hyper::client::Body::BufBody(body_bytes, body_bytes.len()))
.send() {
Ok(response) => response,
// TODO: maybe be more selective in error handling
Err(_) => return None,
};
let mut request = hyper::Request::new(
hyper::Method::Post,
"https://crt.sh/gen-add-chain".parse().unwrap(),
);
request.headers_mut().set(
hyper::header::ContentType::form_url_encoded(),
);
request.headers_mut().set(
hyper::header::Connection::keep_alive(),
);
request.set_body(body.into_bytes());
// TODO: undo this once lifetime bugs are fixed
let r = http_client.request(request);
async_block! {
let response = match await!(r) {
Ok(response) => response,
// TODO: maybe be more selective in error handling
Err(_) => return Err(()),
};

if response.status == hyper::status::StatusCode::NotFound {
return None;
}
if response.status() == hyper::StatusCode::NotFound {
return Err(());
}

let add_chain_request: AddChainRequest = serde_json::from_reader(response).unwrap();
Some(
add_chain_request
.chain
.iter()
.map(|c| base64::decode(c).unwrap())
.collect(),
)
let body = await!(response.body().concat2()).unwrap();
let add_chain_request: AddChainRequest = serde_json::from_slice(&body).unwrap();
Ok(
add_chain_request
.chain
.iter()
.map(|c| base64::decode(c).unwrap())
.collect(),
)
}
}

pub fn is_cert_logged(http_client: &hyper::Client, cert: &[u8]) -> bool {
let response = http_client
.get(&format!("https://crt.sh/?d={}", sha256_hex(cert)))
.header(hyper::header::Connection::keep_alive())
.send()
.unwrap();
response.status == hyper::status::StatusCode::Ok
pub fn is_cert_logged<C: hyper::client::Connect>(
http_client: &hyper::Client<C>,
cert: &[u8],
) -> impl Future<Item = bool, Error = ()> {
let mut request = hyper::Request::new(
hyper::Method::Get,
format!("https://crt.sh/?d={}", sha256_hex(cert))
.parse()
.unwrap(),
);
request.headers_mut().set(
hyper::header::Connection::keep_alive(),
);
let r = http_client.request(request);
async_block! {
let response = await!(r).unwrap();
Ok(response.status() == hyper::StatusCode::Ok)
}
}

pub fn url_for_cert(cert: &[u8]) -> String {
Expand Down
98 changes: 58 additions & 40 deletions src/ct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ use super::common::Log;

use base64;
use byteorder::{BigEndian, WriteBytesExt};

use futures;
use futures::prelude::*;
use hyper;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use serde_json;
use std::io::{Read, Write};
use std::io::Write;
use std::time::Duration;
use tokio_core::reactor::Timeout;


#[derive(Debug, Deserialize)]
Expand All @@ -28,7 +32,7 @@ impl SignedCertificateTimestamp {
b.write_u64::<BigEndian>(self.timestamp).unwrap();

let extensions = base64::decode(&self.extensions).unwrap();
assert!(extensions.len() <= 65535);
assert!(extensions.len() <= 65_535);
b.write_u16::<BigEndian>(extensions.len() as u16).unwrap();
b.write_all(&extensions).unwrap();

Expand All @@ -40,59 +44,73 @@ impl SignedCertificateTimestamp {
}


fn submit_to_log(
http_client: &hyper::Client,
url: &str,
payload: &[u8],
) -> Option<SignedCertificateTimestamp> {
let mut url = "https://".to_string() + url;
fn submit_to_log<C: hyper::client::Connect>(
http_client: &hyper::Client<C>,
log: &Log,
payload: Vec<u8>,
) -> impl Future<Item = SignedCertificateTimestamp, Error = ()> {
let mut url = "https://".to_string() + &log.url;
if !url.ends_with('/') {
url += "/";
}
url += "ct/v1/add-chain";
let response = http_client
.post(&url)
.body(hyper::client::Body::BufBody(payload, payload.len()))
.header(hyper::header::ContentType::json())
.send();
let response = match response {
Ok(r) => r,
// TODO: maybe not all of these should be silently ignored.
Err(_) => return None,
};

// 400, 403, and probably some others generally indicate a log doesn't accept certs from this
// root, or that the log isn't accepting new submissions. Server errors mean there's nothing we
// can do.
if response.status.is_client_error() || response.status.is_server_error() {
return None;
let mut request = hyper::Request::new(hyper::Method::Post, url.parse().unwrap());
request.headers_mut().set(
hyper::header::ContentType::json(),
);
request.set_body(payload);
let r = http_client.request(request);
async_block! {
let response = match await!(r) {
Ok(r) => r,
// TODO: maybe not all of these should be silently ignored.
Err(_) => return Err(()),
};

// 400, 403, and probably some others generally indicate a log doesn't accept certs from
// this root, or that the log isn't accepting new submissions. Server errors mean there's
// nothing we can do.
if response.status().is_client_error() || response.status().is_server_error() {
return Err(());
}

// Limt the response to 10MB (well above what would ever be needed) to be resilient to DoS
// in the face of a dumb or malicious log.
let body = await!(response.body().take(10 * 1024 * 1024).concat2())
.unwrap();
Ok(serde_json::from_slice(&body).unwrap())
}

// Limt the response to 10MB (well above what would ever be needed) to be resilient to DoS in
// the face of a dumb or malicious log.
Some(
serde_json::from_reader(response.take(10 * 1024 * 1024)).unwrap(),
)
}

#[derive(Serialize, Deserialize)]
pub struct AddChainRequest {
pub chain: Vec<String>,
}

pub fn submit_cert_to_logs<'a>(
http_client: &hyper::Client,
logs: &'a [Log],
pub fn submit_cert_to_logs<C: hyper::client::Connect>(
http_client: &hyper::Client<C>,
logs: &[Log],
cert: &[Vec<u8>],
) -> Vec<(&'a Log, SignedCertificateTimestamp)> {
) -> impl Future<Item = Vec<(usize, SignedCertificateTimestamp)>, Error = ()> {
let payload = serde_json::to_vec(&AddChainRequest {
chain: cert.iter().map(|r| base64::encode(r)).collect(),
}).unwrap();

logs.par_iter()
.filter_map(|log| {
let sct = submit_to_log(http_client, &log.url, &payload);
sct.map(|s| (log, s))
let futures = logs.iter()
.enumerate()
.map(move |(idx, log)| {
let timeout = Timeout::new(Duration::from_secs(5), http_client.handle()).unwrap();

let payload = payload.clone();
let s = submit_to_log(http_client, log, payload).select2(timeout);
async_block! {
match await!(s) {
Ok(futures::future::Either::A((sct, _))) => Ok(Some((idx, sct))),
_ => Ok(None),
}
}
})
.collect()
.collect::<Vec<_>>();

futures::future::join_all(futures).map(|scts| scts.into_iter().filter_map(|s| s).collect())
}
75 changes: 49 additions & 26 deletions src/google.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use super::common::Log;

use futures::prelude::*;

use hyper;
use serde_json;
use std::io::Read;


const LOG_LIST_URL: &'static str = "https://www.gstatic.com/ct/log_list/log_list.json";
const TRUSTED_LOG_LIST_URL: &'static str = "https://www.gstatic.com/ct/log_list/log_list.json";
const ALL_LOG_LIST_URL: &'static str = "https://www.gstatic.com/ct/log_list/all_logs_list.json";

#[derive(Deserialize)]
struct LogsResponseLogs {
description: String,
Expand All @@ -26,28 +29,48 @@ struct LogsResponse {
operators: Vec<LogsResponseOperators>,
}

pub fn fetch_trusted_ct_logs(http_client: &hyper::Client) -> Vec<Log> {
let response = http_client.get(LOG_LIST_URL).send().unwrap();
// Limit the response to 10MB at most, to be resillient to DoS.
let logs_response: LogsResponse = serde_json::from_reader(response.take(10 * 1024 * 1024))
.unwrap();

let google_id = logs_response
.operators
.iter()
.find(|o| o.name == "Google")
.map(|o| o.id);

logs_response
.logs
.into_iter()
.filter(|log| log.disqualified_at.is_none())
.map(|log| {
Log {
url: log.url,
description: log.description,
is_google: log.operated_by.contains(&google_id.unwrap()),
}
})
.collect()
pub fn fetch_trusted_ct_logs<'a, C: hyper::client::Connect>(
http_client: &'a hyper::Client<C>,
) -> impl Future<Item = Vec<Log>, Error = ()> + 'a {
fetch_log_list(http_client, TRUSTED_LOG_LIST_URL.parse().unwrap())
}

pub fn fetch_all_ct_logs<'a, C: hyper::client::Connect>(
http_client: &'a hyper::Client<C>,
) -> impl Future<Item = Vec<Log>, Error = ()> + 'a {
fetch_log_list(http_client, ALL_LOG_LIST_URL.parse().unwrap())
}

fn fetch_log_list<'a, C: hyper::client::Connect>(
http_client: &'a hyper::Client<C>,
uri: hyper::Uri,
) -> impl Future<Item = Vec<Log>, Error = ()> + 'a {
async_block! {
let response = await!(http_client.get(uri)).unwrap();
// Limit the response to 10MB at most, to be resillient to DoS.
let body = await!(response.body().take(10 * 1024 * 1024).concat2()).unwrap();
let logs_response: LogsResponse = serde_json::from_slice(&body).unwrap();

let google_id = logs_response
.operators
.iter()
.find(|o| o.name == "Google")
.map(|o| o.id)
.unwrap();

Ok(
logs_response
.logs
.into_iter()
.filter(|log| log.disqualified_at.is_none())
.map(move |log| {
Log {
url: log.url,
description: log.description,
is_google: log.operated_by.contains(&google_id),
}
})
.collect(),
)
}
}
Loading

0 comments on commit ae951b6

Please sign in to comment.