Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Real body #103

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion src/tink/http/Request.hx
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class OutgoingRequestHeader extends RequestHeader {
return new OutgoingRequestHeader(method, url, protocol, this.fields.concat(fields));
}

class OutgoingRequest extends Message<OutgoingRequestHeader, IdealSource> {}
class OutgoingRequest extends Message<OutgoingRequestHeader, RealSource> {}

class IncomingRequest extends Message<IncomingRequestHeader, IncomingRequestBody> {

Expand Down
2 changes: 1 addition & 1 deletion src/tink/http/Response.hx
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class ResponseHeaderBase extends Header {
);
}

private class OutgoingResponseData extends Message<ResponseHeader, IdealSource> {}
private class OutgoingResponseData extends Message<ResponseHeader, RealSource> {}

@:forward
abstract OutgoingResponse(OutgoingResponseData) {
Expand Down
5 changes: 2 additions & 3 deletions src/tink/http/clients/CurlClient.hx
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ using tink.CoreApi;

// Does not restrict to any platform as long as they can run the curl command somehow
class CurlClient implements ClientObject {

var protocol:String = 'http';

public function new(?curl:Array<String>->IdealSource->RealSource) {
public function new(?curl:Array<String>->RealSource->RealSource) {
if(curl != null) this.curl = curl;
}

Expand Down Expand Up @@ -45,7 +44,7 @@ class CurlClient implements ClientObject {
.next(function (p) return new IncomingResponse(p.a, p.b));
}

dynamic function curl(args:Array<String>, body:IdealSource):RealSource {
dynamic function curl(args:Array<String>, body:RealSource):RealSource {
#if (sys || nodejs)
args.push('--data-binary');
args.push('@-');
Expand Down
5 changes: 4 additions & 1 deletion src/tink/http/clients/JsClient.hx
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ class JsClient implements ClientObject {
if(req.header.method == GET)
http.send();
else
req.body.all().handle(function(chunk) http.send(new Int8Array(chunk.toBytes().getData())));
req.body.all().handle(function(o) switch o {
case Success(chunk): http.send(new Int8Array(chunk.toBytes().getData()));
case Failure(e): cb(Failure(e));
});
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/tink/http/clients/NodeClient.hx
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class NodeClient implements ClientObject {
switch res {
case AllWritten:
case SinkEnded(_): fail(new Error(502, 'Gateway Error'));
case SinkFailed(e, _): fail(e);
case SourceFailed(e) | SinkFailed(e, _): fail(e);
}
});
});
Expand Down
76 changes: 39 additions & 37 deletions src/tink/http/clients/PhpClient.hx
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,45 @@ class PhpClient implements ClientObject {

public function request(req:OutgoingRequest):Promise<IncomingResponse> {
return Future.async(function(cb) {
req.body.all().handle(function(chunk) {
var options = php.Lib.associativeArrayOfObject({
http: php.Lib.associativeArrayOfObject({
// protocol_version: // TODO: req does not define the version?
header: [for(h in req.header) h.toString()].join('\r\n') + '\r\n',
method: req.header.method,
content: chunk.toBytes().getData().toString(),
}),
});
var context = untyped __call__('stream_context_create', options);
var url:String = req.header.url;
var result = @:privateAccess new sys.io.FileInput(untyped __call__('fopen', url, 'rb', false, context));

var rawHeaders:Array<String> = cast php.Lib.toHaxeArray(untyped __php__("$http_response_header"));

// http://php.net/manual/en/reserved.variables.httpresponseheader.php#122362
// $http_response_header includes all the "history" headers in case of redirected response
var i = rawHeaders.length;
while(i-- >= 0) if(rawHeaders[i].startsWith('HTTP/')) break;
rawHeaders = rawHeaders.slice(i);

// construct the header object
var head = rawHeaders[0].split(' ');
var headers = [for(i in 1...rawHeaders.length) {
var line = rawHeaders[i];
var index = line.indexOf(': ');
new HeaderField(line.substr(0, index), line.substr(index + 2));
}];
var header = new ResponseHeader(Std.parseInt(head[1]), head.slice(2).join(' '), headers);
cb(Success(new IncomingResponse(header, result.readAll())));

// var headers:IdealSource = php.Lib.toHaxeArray(untyped __php__("$http_response_header")).join('\r\n') + '\r\n';
// headers.parse(ResponseHeader.parser()).handle(function(o) switch o {
// case Success(parsed):
// case Failure(e):
// cb(Failure(e));
// });
req.body.all().handle(function(o) switch o {
case Success(chunk):
var options = php.Lib.associativeArrayOfObject({
http: php.Lib.associativeArrayOfObject({
// protocol_version: // TODO: req does not define the version?
header: [for(h in req.header) h.toString()].join('\r\n') + '\r\n',
method: req.header.method,
content: chunk.toBytes().getData().toString(),
}),
});
var context = untyped __call__('stream_context_create', options);
var url:String = req.header.url;
var result = @:privateAccess new sys.io.FileInput(untyped __call__('fopen', url, 'rb', false, context));

var rawHeaders:Array<String> = cast php.Lib.toHaxeArray(untyped __php__("$http_response_header"));

// http://php.net/manual/en/reserved.variables.httpresponseheader.php#122362
// $http_response_header includes all the "history" headers in case of redirected response
var i = rawHeaders.length;
while(i-- >= 0) if(rawHeaders[i].startsWith('HTTP/')) break;
rawHeaders = rawHeaders.slice(i);

// construct the header object
var head = rawHeaders[0].split(' ');
var headers = [for(i in 1...rawHeaders.length) {
var line = rawHeaders[i];
var index = line.indexOf(': ');
new HeaderField(line.substr(0, index), line.substr(index + 2));
}];
var header = new ResponseHeader(Std.parseInt(head[1]), head.slice(2).join(' '), headers);
cb(Success(new IncomingResponse(header, result.readAll())));

// var headers:IdealSource = php.Lib.toHaxeArray(untyped __php__("$http_response_header")).join('\r\n') + '\r\n';
// headers.parse(ResponseHeader.parser()).handle(function(o) switch o {
// case Success(parsed):
// case Failure(e):
// cb(Failure(e));
// });
case Failure(e): cb(Failure(e));
});
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/tink/http/clients/SocketClient.hx
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class SocketClient implements ClientObject {
});

case SinkEnded(_): cb(Failure(new Error('Sink ended unexpectedly')));
case SinkFailed(e, _): cb(Failure(e));
case SourceFailed(e) | SinkFailed(e, _): cb(Failure(e));
}
});
});
Expand Down
9 changes: 6 additions & 3 deletions src/tink/http/clients/StdClient.hx
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ class StdClient implements ClientObject {
case GET | HEAD | OPTIONS:
send(false);
default:
req.body.all().handle(function(bytes) {
r.setPostData(bytes.toString());
send(true);
req.body.all().handle(function(o) switch o {
case Success(chunk):
r.setPostData(chunk.toString());
send(true);
case Failure(e):
cb(Failure(e));
});
}
});
Expand Down
11 changes: 9 additions & 2 deletions src/tink/http/containers/NodeContainer.hx
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,15 @@ class NodeContainer implements Container {
Plain(Source.ofNodeStream('Incoming HTTP message from ${req.socket.remoteAddress}', req)))
).handle(function (out) {
res.writeHead(out.header.statusCode, out.header.reason, cast [for (h in out.header) [(h.name : String), h.value]]);//TODO: readable status code
out.body.pipeTo(Sink.ofNodeStream('Outgoing HTTP response to ${req.socket.remoteAddress}', res)).handle(function (x) {
res.end();
out.body.pipeTo(Sink.ofNodeStream('Outgoing HTTP response to ${req.socket.remoteAddress}', res)).handle(function (x) switch x {
case AllWritten:
res.end();
case SourceFailed(e):
// TODO: res.addTrailers(...);
var socket:js.node.net.Socket = untyped res.socket; // res.socket is not defined in hxnodejs?!
socket.end();
case SinkFailed(e, _):
case SinkEnded(_): // shouldn't happen?
});
});

Expand Down