Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node HTTP/2 Client/Container #124

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
bin
node_modules
*.hxproj
*.hxproj
*.pem
*.js
2 changes: 1 addition & 1 deletion .haxerc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"version": "4.0.5",
"version": "4.1.2",
"resolveLibs": "scoped"
}
4 changes: 4 additions & 0 deletions haxe_libraries/hxnodejs-http2.hxml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# @install: lix --silent download "gh://github.com/brave-pi/hxnodejs-http2#a0486e5208367acf0c2779f9ece5000348efca50" into hxnodejs-http2/0.0.0/github/a0486e5208367acf0c2779f9ece5000348efca50
-lib hxnodejs
-cp ${HAXE_LIBCACHE}/hxnodejs-http2/0.0.0/github/a0486e5208367acf0c2779f9ece5000348efca50/.
-D hxnodejs-http2=0.0.0
7 changes: 4 additions & 3 deletions haxe_libraries/hxnodejs.hxml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-D hxnodejs=6.9.1
# @install: lix --silent download "gh://github.com/haxefoundation/hxnodejs#38bdefd853f8d637ffb6e74c69ccaedc01985cac" into hxnodejs/6.9.1/github/38bdefd853f8d637ffb6e74c69ccaedc01985cac
-cp ${HAXE_LIBCACHE}/hxnodejs/6.9.1/github/38bdefd853f8d637ffb6e74c69ccaedc01985cac/src
# @install: lix --silent download "gh://github.com/haxefoundation/hxnodejs#276d357b9d3905475ebb3f9ebc15e27b0505aec7" into hxnodejs/12.1.0/github/276d357b9d3905475ebb3f9ebc15e27b0505aec7
-cp ${HAXE_LIBCACHE}/hxnodejs/12.1.0/github/276d357b9d3905475ebb3f9ebc15e27b0505aec7/src
-D hxnodejs=12.1.0
--macro allowPackage('sys')
# should behave like other target defines and not be defined in macro context
--macro define('nodejs')
--macro _internal.SuppressDeprecated.run()
13 changes: 13 additions & 0 deletions http2.hxml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

-cp tests
-dce full
-lib tink_http
-lib hxnodejs
-lib hxnodejs-http2
-main Test
--each
-D client
-js bin/test/client.js
--next
-D server
-js bin/test/server.js
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@
"travix": "haxe -lib travix --run travix.Travix",
"postinstall": "lix download"
},
"dependencies": {}
"dependencies": {
"lix": "^15.9.0"
}
}
3 changes: 3 additions & 0 deletions src/tink/http/Response.hx
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ abstract OutgoingResponse(OutgoingResponseData) {
//TODO: implement

}
public static function ofStream(source:IdealSource, ?code = 200, ?headers:Array<HeaderField>)
return new OutgoingResponse(new ResponseHeader(code, code, headers == null ? [] : headers), source);


@:from static function ofString(s:String)
return blob(s, 'text/plain');
Expand Down
68 changes: 68 additions & 0 deletions src/tink/http/clients/FetchClient.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// #if hxjs_http2
// package tink.http.clients;

// import haxe.DynamicAccess;
// import tink.io.Source;
// import tink.io.Sink;
// import tink.http.Client;
// import tink.http.Header;
// import tink.http.Request;
// import tink.http.Response;

// using tink.CoreApi;

// class FetchClient implements ClientObject {


// public function new() {

// }

// public function request(req:OutgoingRequest):Promise<IncomingResponse> {
// var headers = {
// var map = new OutgoingHttpHeaders();
// map[":path"] = [req.header.url.pathWithQuery];
// map[":method"] = [req.header.method];
// for (h in req.header)
// map[h.name] = [h.value];
// map;
// }

// return fetchRequest(headers, req);
// }

// function fetchRequest<T>(headers:IncomingHttpHeaders, req:OutgoingRequest):Promise<IncomingResponse>
// return Future.async(function(cb) {
// var fwd:js.node.http2.Stream.Http2Stream = agent.request(headers, {endStream: false});
// fwd.on('response', function(headers:IncomingHttpHeaders, _) {
// var status = headers[":status"];
// if (status.length < 0)
// cb(Failure(new Error('Missing status code')));
// else {
// var code = Std.parseInt(status);
// cb(Success(new IncomingResponse(new ResponseHeader(code, null, [
// for (key in headers.keys())
// new HeaderField(key, headers[key])
// ], HTTP2), Source.ofNodeStream('Response from ${req.header.url}', fwd))));
// }
// });
// function fail(e:Error)
// cb(Failure(e));
// fwd.on('error', function(e:#if haxe4 js.lib.Error #else js.Error #end) fail(Error.withData(e.message, e)));
// fwd.on('end', () -> {
// agent.close();
// });
// req.body.pipeTo(Sink.ofNodeStream('Request to ${req.header.url}', fwd)).handle(function(res) {
// trace('done');
// fwd.end();
// switch res {
// case AllWritten:
// case SinkEnded(_):
// fail(new Error(502, 'Gateway Error'));
// case SinkFailed(e, _):
// fail(e);
// }
// });
// });
// }
// #end
74 changes: 74 additions & 0 deletions src/tink/http/clients/NodeClient2.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#if hxnodejs_http2
package tink.http.clients;

import js.node.http2.Session.ClientHttp2Session;
import haxe.DynamicAccess;
import tink.io.Source;
import tink.io.Sink;
import tink.http.Client;
import tink.http.Header;
import tink.http.Request;
import tink.http.Response;
import js.node.http.IncomingMessage;
import js.node.Http2;
import js.node.http2.Session;

using tink.CoreApi;

class NodeClient2 implements ClientObject {
var opts:js.node.Http2.SecureClientSessionOptions;
var agent:ClientHttp2Session;

public function new(url:tink.Url, ?opts) {
this.opts = opts;
this.agent = js.node.Http2.connect('${url.scheme}://${url.host}', opts);
}

public function request(req:OutgoingRequest):Promise<IncomingResponse> {
var headers = {
var map = new OutgoingHttpHeaders();
map[":path"] = [req.header.url.pathWithQuery];
map[":method"] = [req.header.method];
for (h in req.header)
map[h.name] = [h.value];
map;
}

return nodeRequest(headers, req);
}

function nodeRequest(headers:IncomingHttpHeaders, req:OutgoingRequest):Promise<IncomingResponse>
return Future.async(function(cb) {
var fwd:js.node.http2.Stream.Http2Stream = agent.request(headers, {endStream: false});
fwd.on('response', function(headers:IncomingHttpHeaders, _) {
var status = headers[":status"];
if (status.length < 0)
cb(Failure(new Error('Missing status code')));
else {
var code = Std.parseInt(status);
cb(Success(new IncomingResponse(new ResponseHeader(code, null, [
for (key in headers.keys())
new HeaderField(key, headers[key])
], HTTP2), Source.ofNodeStream('Response from ${req.header.url}', fwd))));
}
});
function fail(e:Error)
cb(Failure(e));
fwd.on('error', function(e:#if haxe4 js.lib.Error #else js.Error #end) fail(Error.withData(e.message, e)));
fwd.on('end', () -> {
agent.close();
});
req.body.pipeTo(Sink.ofNodeStream('Request to ${req.header.url}', fwd)).handle(function(res) {
trace('done');
fwd.end();
switch res {
case AllWritten:
case SinkEnded(_):
fail(new Error(502, 'Gateway Error'));
case SinkFailed(e, _):
fail(e);
}
});
});
}
#end
158 changes: 158 additions & 0 deletions src/tink/http/containers/NodeContainer2.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#if hxnodejs_http2
package tink.http.containers;

import js.node.Https.HttpsCreateServerOptions;
import js.node.Http2.Http2ServerResponse;
import js.node.Http2.Http2ServerRequest;
import tink.http.Container;
import tink.http.Request;
import tink.http.Header;
import tink.io.*;
import js.node.http.*;
import #if haxe4 js.lib.Error #else js.Error #end as JsError;

using tink.CoreApi;

class NodeContainer2 implements Container {
var upgradable:Bool;
var kind:ServerKind;

public function new(kind:ServerKind) {
this.kind = kind;
}

static public function toNodeHandler(handler:Handler, ?options:{?body:Http2ServerRequest->IncomingRequestBody}) {
var body = switch options {
case null | {body: null}: function(msg:Http2ServerRequest) return
Plain(Source.ofNodeStream('Incoming HTTP message from ${(msg.socket : Dynamic).remoteEndpoint}', msg.stream));
case _: options.body;
}
return function(req:Http2ServerRequest,
res:Http2ServerResponse) handler.process(new IncomingRequest((req.socket : Dynamic).remoteAddress,
IncomingRequestHeader.fromIncomingMessage((req : Dynamic)), body(req)))
.handle(function(out) {
var headers = new Map();
for (h in out.header) {
if (!headers.exists(h.name))
headers[h.name] = [];
headers[h.name].push(h.value);
}
for (name in headers.keys())
res.setHeader(name, headers[name]);
res.writeHead(out.header.statusCode, out.header.reason); // TODO: readable status code
out.body.pipeTo(Sink.ofNodeStream('Outgoing HTTP response to ${(req.socket : Dynamic).remoteAddress}', res.stream)).handle(function(x) {
res.end();
});
});
}

public function runSecure(?cfg:Null<js.node.Http2.SecureServerOptions>, handler:Handler)
return Future.async(function(cb) {
var failures = Signal.trigger();
var createServer = if (cfg == null) () -> js.node.Http2.createSecureServer()
else
() -> js.node.Http2.createSecureServer(cfg);
boot(cast createServer, failures, handler).handle(cb);
});

public function runWithOptions(?cfg:Null<js.node.Http2.ServerOptions>, handler:Handler)
return Future.async(function(cb) {
var failures = Signal.trigger();
var createServer = if (cfg == null) () -> js.node.Http2.createServer()
else
() -> js.node.Http2.createServer(cfg);
boot(cast createServer, failures, handler).handle(cb);
});

public function run(handler:Handler)
return Future.async(function(cb) {
var failures = Signal.trigger();
var createServer = () -> js.node.Http2.createServer();
boot(cast createServer, failures, handler).handle(cb);
});

function boot(createServer:Void->ServerLike, failures:Signal<ContainerFailure>, handler:Handler)
return Future.async(function(cb) {
var server:ServerLike = switch kind {
case Instance(server):
server;
case Port(port):
var server = createServer();
server.listen(port);
server;
case Host(host):
var server = createServer();
server.listen('${host.name}:${host.port}');
server;
case Path(path):
var server = createServer();
server.listen(path);
server;
case Fd(fd):
var server = createServer();
server.listen(fd);
server;
}
server.on('error', function(e) {
cb(Failed(e));
});
function onListen() {
cb(Running({
shutdown: function(hard:Bool) {
if (hard)
trace('Warning: hard shutdown not implemented');

return Future.async(function(cb) {
server.close(function() cb(true));
});
},
failures: failures, // TODO: these need to be triggered
}));
}
if (untyped server.listening) // .listening added in v5.7.0, not added to hxnodejs yet
onListen()
else
server.on('listening', onListen);
server.on('request', toNodeHandler(handler));
server.on('error', function(e) cb(Failed(e)));
});
}

typedef ServerLike = {
var listening(default, null):Bool;
function listen(i:Dynamic, ?cb:Null<Void->Void>):Void;
function on(type:String, cb:haxe.Constraints.Function):Void;
function close(cb:() -> Void):Void;
}

private enum ServerKindBase {
Instance(server:ServerLike);
Port(port:Int);
Host(host:tink.url.Host);
Path(path:String);
Fd(fd:{fd:Int});
}

abstract ServerKind(ServerKindBase) from ServerKindBase to ServerKindBase {
@:from
public static inline function fromInstance(server:ServerLike):ServerKind
return Instance(server);

@:from
public static inline function fromPort(port:Int):ServerKind
return Port(port);

@:from
public static inline function fromHost(host:tink.url.Host):ServerKind
return Host(host);

@:from
public static inline function fromPath(path:String):ServerKind
return Path(path);

@:from
public static inline function fromFd(fd:{fd:Int}):ServerKind
return Fd(fd);
}

#end
Loading