From 404952ff4afa9d5c057e5f24f5688e475cfbcbfd Mon Sep 17 00:00:00 2001 From: Gabriel Hayes Date: Mon, 20 Jul 2020 22:45:27 -0500 Subject: [PATCH 1/2] Node HTTP/2 Client/Container --- .gitignore | 4 +- .haxerc | 2 +- haxe_libraries/hxnodejs-http2.hxml | 4 + haxe_libraries/hxnodejs.hxml | 7 +- http2.hxml | 10 ++ package-lock.json | 13 ++ package.json | 4 +- src/tink/http/Response.hx | 3 + src/tink/http/clients/FetchClient.hx | 68 +++++++++ src/tink/http/clients/NodeClient2.hx | 74 ++++++++++ src/tink/http/containers/NodeContainer2.hx | 158 +++++++++++++++++++++ tests/Test.hx | 111 +++++++++++++-- 12 files changed, 437 insertions(+), 21 deletions(-) create mode 100644 haxe_libraries/hxnodejs-http2.hxml create mode 100644 http2.hxml create mode 100644 package-lock.json create mode 100644 src/tink/http/clients/FetchClient.hx create mode 100644 src/tink/http/clients/NodeClient2.hx create mode 100644 src/tink/http/containers/NodeContainer2.hx diff --git a/.gitignore b/.gitignore index c668920..c86478c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ bin node_modules -*.hxproj \ No newline at end of file +*.hxproj +*.pem +*.js \ No newline at end of file diff --git a/.haxerc b/.haxerc index 8b6b442..372dbd4 100644 --- a/.haxerc +++ b/.haxerc @@ -1,4 +1,4 @@ { - "version": "4.0.5", + "version": "4.1.2", "resolveLibs": "scoped" } \ No newline at end of file diff --git a/haxe_libraries/hxnodejs-http2.hxml b/haxe_libraries/hxnodejs-http2.hxml new file mode 100644 index 0000000..8b2f31b --- /dev/null +++ b/haxe_libraries/hxnodejs-http2.hxml @@ -0,0 +1,4 @@ +# @install: lix --silent download "gh://github.com/brave-pi/hxnodejs-http2#a0486e5208367acf0c2779f9ece5000348efca50" into hxnodejs-http2/0.0.0/github/a0486e5208367acf0c2779f9ece5000348efca50 +-lib hxnodejs +-cp ${HAXE_LIBCACHE}/hxnodejs-http2/0.0.0/github/a0486e5208367acf0c2779f9ece5000348efca50/. +-D hxnodejs-http2=0.0.0 \ No newline at end of file diff --git a/haxe_libraries/hxnodejs.hxml b/haxe_libraries/hxnodejs.hxml index 8f6b16a..61385dc 100644 --- a/haxe_libraries/hxnodejs.hxml +++ b/haxe_libraries/hxnodejs.hxml @@ -1,6 +1,7 @@ --D hxnodejs=6.9.1 -# @install: lix --silent download "gh://github.com/haxefoundation/hxnodejs#38bdefd853f8d637ffb6e74c69ccaedc01985cac" into hxnodejs/6.9.1/github/38bdefd853f8d637ffb6e74c69ccaedc01985cac --cp ${HAXE_LIBCACHE}/hxnodejs/6.9.1/github/38bdefd853f8d637ffb6e74c69ccaedc01985cac/src +# @install: lix --silent download "gh://github.com/haxefoundation/hxnodejs#276d357b9d3905475ebb3f9ebc15e27b0505aec7" into hxnodejs/12.1.0/github/276d357b9d3905475ebb3f9ebc15e27b0505aec7 +-cp ${HAXE_LIBCACHE}/hxnodejs/12.1.0/github/276d357b9d3905475ebb3f9ebc15e27b0505aec7/src +-D hxnodejs=12.1.0 --macro allowPackage('sys') # should behave like other target defines and not be defined in macro context --macro define('nodejs') +--macro _internal.SuppressDeprecated.run() diff --git a/http2.hxml b/http2.hxml new file mode 100644 index 0000000..783a7f6 --- /dev/null +++ b/http2.hxml @@ -0,0 +1,10 @@ +-cp tests +-dce full +-lib hxnodejs-http2 +-main Test +--next +-D client +-js bin/test/client.js +--next +-D server +-js bin/test/server.js \ No newline at end of file diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..a6f561f --- /dev/null +++ b/package-lock.json @@ -0,0 +1,13 @@ +{ + "name": "tink_http", + "version": "0.0.0", + "lockfileVersion": 1, + "requires": true, + "dependencies": { + "lix": { + "version": "15.9.0", + "resolved": "https://registry.npmjs.org/lix/-/lix-15.9.0.tgz", + "integrity": "sha512-XyIVUZ9Mr3DNPXwbvKrAS3t1XeeBugbrva/YoMUQSJLrmCwVNlx1/ishj4bkZ/WdH1x0m1Smd767zwyr6wy/8g==" + } + } +} diff --git a/package.json b/package.json index 37e2ddc..f779cbb 100644 --- a/package.json +++ b/package.json @@ -8,5 +8,7 @@ "travix": "haxe -lib travix --run travix.Travix", "postinstall": "lix download" }, - "dependencies": {} + "dependencies": { + "lix": "^15.9.0" + } } diff --git a/src/tink/http/Response.hx b/src/tink/http/Response.hx index 1677107..03adfac 100644 --- a/src/tink/http/Response.hx +++ b/src/tink/http/Response.hx @@ -85,6 +85,9 @@ abstract OutgoingResponse(OutgoingResponseData) { //TODO: implement } + public static function ofStream(source:IdealSource, ?code = 200, ?headers:Array) + return new OutgoingResponse(new ResponseHeader(code, code, headers == null ? [] : headers), source); + @:from static function ofString(s:String) return blob(s, 'text/plain'); diff --git a/src/tink/http/clients/FetchClient.hx b/src/tink/http/clients/FetchClient.hx new file mode 100644 index 0000000..7abb24a --- /dev/null +++ b/src/tink/http/clients/FetchClient.hx @@ -0,0 +1,68 @@ +// #if hxjs_http2 +// package tink.http.clients; + +// import haxe.DynamicAccess; +// import tink.io.Source; +// import tink.io.Sink; +// import tink.http.Client; +// import tink.http.Header; +// import tink.http.Request; +// import tink.http.Response; + +// using tink.CoreApi; + +// class FetchClient implements ClientObject { + + +// public function new() { + +// } + +// public function request(req:OutgoingRequest):Promise { +// var headers = { +// var map = new OutgoingHttpHeaders(); +// map[":path"] = [req.header.url.pathWithQuery]; +// map[":method"] = [req.header.method]; +// for (h in req.header) +// map[h.name] = [h.value]; +// map; +// } + +// return fetchRequest(headers, req); +// } + +// function fetchRequest(headers:IncomingHttpHeaders, req:OutgoingRequest):Promise +// return Future.async(function(cb) { +// var fwd:js.node.http2.Stream.Http2Stream = agent.request(headers, {endStream: false}); +// fwd.on('response', function(headers:IncomingHttpHeaders, _) { +// var status = headers[":status"]; +// if (status.length < 0) +// cb(Failure(new Error('Missing status code'))); +// else { +// var code = Std.parseInt(status); +// cb(Success(new IncomingResponse(new ResponseHeader(code, null, [ +// for (key in headers.keys()) +// new HeaderField(key, headers[key]) +// ], HTTP2), Source.ofNodeStream('Response from ${req.header.url}', fwd)))); +// } +// }); +// function fail(e:Error) +// cb(Failure(e)); +// fwd.on('error', function(e:#if haxe4 js.lib.Error #else js.Error #end) fail(Error.withData(e.message, e))); +// fwd.on('end', () -> { +// agent.close(); +// }); +// req.body.pipeTo(Sink.ofNodeStream('Request to ${req.header.url}', fwd)).handle(function(res) { +// trace('done'); +// fwd.end(); +// switch res { +// case AllWritten: +// case SinkEnded(_): +// fail(new Error(502, 'Gateway Error')); +// case SinkFailed(e, _): +// fail(e); +// } +// }); +// }); +// } +// #end \ No newline at end of file diff --git a/src/tink/http/clients/NodeClient2.hx b/src/tink/http/clients/NodeClient2.hx new file mode 100644 index 0000000..f82c54b --- /dev/null +++ b/src/tink/http/clients/NodeClient2.hx @@ -0,0 +1,74 @@ +#if hxnodejs_http2 +package tink.http.clients; + +import js.node.http2.Session.ClientHttp2Session; +import haxe.DynamicAccess; +import tink.io.Source; +import tink.io.Sink; +import tink.http.Client; +import tink.http.Header; +import tink.http.Request; +import tink.http.Response; +import js.node.http.IncomingMessage; +import js.node.Http2; +import js.node.http2.Session; + +using tink.CoreApi; + +class NodeClient2 implements ClientObject { + var opts:js.node.Http2.SecureClientSessionOptions; + var agent:ClientHttp2Session; + + public function new(url:tink.Url, ?opts) { + this.opts = opts; + this.agent = js.node.Http2.connect('${url.scheme}://${url.host}', opts); + } + + public function request(req:OutgoingRequest):Promise { + var headers = { + var map = new OutgoingHttpHeaders(); + map[":path"] = [req.header.url.pathWithQuery]; + map[":method"] = [req.header.method]; + for (h in req.header) + map[h.name] = [h.value]; + map; + } + + return nodeRequest(headers, req); + } + + function nodeRequest(headers:IncomingHttpHeaders, req:OutgoingRequest):Promise + return Future.async(function(cb) { + var fwd:js.node.http2.Stream.Http2Stream = agent.request(headers, {endStream: false}); + fwd.on('response', function(headers:IncomingHttpHeaders, _) { + var status = headers[":status"]; + if (status.length < 0) + cb(Failure(new Error('Missing status code'))); + else { + var code = Std.parseInt(status); + cb(Success(new IncomingResponse(new ResponseHeader(code, null, [ + for (key in headers.keys()) + new HeaderField(key, headers[key]) + ], HTTP2), Source.ofNodeStream('Response from ${req.header.url}', fwd)))); + } + }); + function fail(e:Error) + cb(Failure(e)); + fwd.on('error', function(e:#if haxe4 js.lib.Error #else js.Error #end) fail(Error.withData(e.message, e))); + fwd.on('end', () -> { + agent.close(); + }); + req.body.pipeTo(Sink.ofNodeStream('Request to ${req.header.url}', fwd)).handle(function(res) { + trace('done'); + fwd.end(); + switch res { + case AllWritten: + case SinkEnded(_): + fail(new Error(502, 'Gateway Error')); + case SinkFailed(e, _): + fail(e); + } + }); + }); +} +#end \ No newline at end of file diff --git a/src/tink/http/containers/NodeContainer2.hx b/src/tink/http/containers/NodeContainer2.hx new file mode 100644 index 0000000..bf1b4c5 --- /dev/null +++ b/src/tink/http/containers/NodeContainer2.hx @@ -0,0 +1,158 @@ +#if hxnodejs_http2 +package tink.http.containers; + +import js.node.Https.HttpsCreateServerOptions; +import js.node.Http2.Http2ServerResponse; +import js.node.Http2.Http2ServerRequest; +import tink.http.Container; +import tink.http.Request; +import tink.http.Header; +import tink.io.*; +import js.node.http.*; +import #if haxe4 js.lib.Error #else js.Error #end as JsError; + +using tink.CoreApi; + +class NodeContainer2 implements Container { + var upgradable:Bool; + var kind:ServerKind; + + public function new(kind:ServerKind) { + this.kind = kind; + } + + static public function toNodeHandler(handler:Handler, ?options:{?body:Http2ServerRequest->IncomingRequestBody}) { + var body = switch options { + case null | {body: null}: function(msg:Http2ServerRequest) return + Plain(Source.ofNodeStream('Incoming HTTP message from ${(msg.socket : Dynamic).remoteEndpoint}', msg.stream)); + case _: options.body; + } + return function(req:Http2ServerRequest, + res:Http2ServerResponse) handler.process(new IncomingRequest((req.socket : Dynamic).remoteAddress, + IncomingRequestHeader.fromIncomingMessage((req : Dynamic)), body(req))) + .handle(function(out) { + var headers = new Map(); + for (h in out.header) { + if (!headers.exists(h.name)) + headers[h.name] = []; + headers[h.name].push(h.value); + } + for (name in headers.keys()) + res.setHeader(name, headers[name]); + res.writeHead(out.header.statusCode, out.header.reason); // TODO: readable status code + out.body.pipeTo(Sink.ofNodeStream('Outgoing HTTP response to ${(req.socket : Dynamic).remoteAddress}', res.stream)).handle(function(x) { + res.end(); + }); + }); + } + + public function runSecure(?cfg:Null, handler:Handler) + return Future.async(function(cb) { + var failures = Signal.trigger(); + var createServer = if (cfg == null) () -> js.node.Http2.createSecureServer() + else + () -> js.node.Http2.createSecureServer(cfg); + boot(cast createServer, failures, handler).handle(cb); + }); + + public function runWithOptions(?cfg:Null, handler:Handler) + return Future.async(function(cb) { + var failures = Signal.trigger(); + var createServer = if (cfg == null) () -> js.node.Http2.createServer() + else + () -> js.node.Http2.createServer(cfg); + boot(cast createServer, failures, handler).handle(cb); + }); + + public function run(handler:Handler) + return Future.async(function(cb) { + var failures = Signal.trigger(); + var createServer = () -> js.node.Http2.createServer(); + boot(cast createServer, failures, handler).handle(cb); + }); + + function boot(createServer:Void->ServerLike, failures:Signal, handler:Handler) + return Future.async(function(cb) { + var server:ServerLike = switch kind { + case Instance(server): + server; + case Port(port): + var server = createServer(); + server.listen(port); + server; + case Host(host): + var server = createServer(); + server.listen('${host.name}:${host.port}'); + server; + case Path(path): + var server = createServer(); + server.listen(path); + server; + case Fd(fd): + var server = createServer(); + server.listen(fd); + server; + } + server.on('error', function(e) { + cb(Failed(e)); + }); + function onListen() { + cb(Running({ + shutdown: function(hard:Bool) { + if (hard) + trace('Warning: hard shutdown not implemented'); + + return Future.async(function(cb) { + server.close(function() cb(true)); + }); + }, + failures: failures, // TODO: these need to be triggered + })); + } + if (untyped server.listening) // .listening added in v5.7.0, not added to hxnodejs yet + onListen() + else + server.on('listening', onListen); + server.on('request', toNodeHandler(handler)); + server.on('error', function(e) cb(Failed(e))); + }); +} + +typedef ServerLike = { + var listening(default, null):Bool; + function listen(i:Dynamic, ?cb:NullVoid>):Void; + function on(type:String, cb:haxe.Constraints.Function):Void; + function close(cb:() -> Void):Void; +} + +private enum ServerKindBase { + Instance(server:ServerLike); + Port(port:Int); + Host(host:tink.url.Host); + Path(path:String); + Fd(fd:{fd:Int}); +} + +abstract ServerKind(ServerKindBase) from ServerKindBase to ServerKindBase { + @:from + public static inline function fromInstance(server:ServerLike):ServerKind + return Instance(server); + + @:from + public static inline function fromPort(port:Int):ServerKind + return Port(port); + + @:from + public static inline function fromHost(host:tink.url.Host):ServerKind + return Host(host); + + @:from + public static inline function fromPath(path:String):ServerKind + return Path(path); + + @:from + public static inline function fromFd(fd:{fd:Int}):ServerKind + return Fd(fd); +} + +#end \ No newline at end of file diff --git a/tests/Test.hx b/tests/Test.hx index 2ff29b7..b40b27a 100644 --- a/tests/Test.hx +++ b/tests/Test.hx @@ -1,20 +1,101 @@ -package ; +package; -import tink.http.containers.NodeContainer; -import tink.http.containers.TcpContainer; +import tink.http.Handler; +import tink.http.Request; +import tink.http.containers.NodeContainer2; +import tink.http.clients.NodeClient2; +import tink.http.Container; import tink.http.Response; - +import tink.io.Source; +import tink.http.StructuredBody; +import tink.streams.Stream; +import tink.http.Header; +using tink.io.Source; using tink.CoreApi; class Test { - static function main() { - haxe.Log.trace = function (v:Dynamic, ?pos:haxe.PosInfos) { - js.Node.console.log(pos.fileName + ':' + pos.lineNumber, v); - } - var port = tink.tcp.nodejs.NodejsAcceptor.inst.bind.bind(12345); - var c = new TcpContainer(port); - c.run(function (req) { - return Future.sync(('hello, world' : OutgoingResponse)); - }); - } -} \ No newline at end of file + static function main() { + function handleRunningState(state:RunningState) { + state.failures.handle(f -> { + trace(f); + }); + } + #if client + trace('In this demo, the client multiplexes two bi-directional streams to the server over one HTTP connection.'); + var agent = new NodeClient2("https://localhost:8080", { + ca: [js.node.Fs.readFileSync('localhost-cert.pem')] + }); + var requestStream = Signal.trigger(); + var id = 0; + function client() { + agent.request(new OutgoingRequest(new OutgoingRequestHeader(GET, "/", "HTTP/2", [new HeaderField("tink-stream-id", id++)]), + new SignalStream(requestStream))) + .next(r -> { + r.body.chunked().forEach(c -> { + trace('Client received: $c'); + Resume; + }).eager(); + }) + .eager(); + } + client(); + client(); + var rl = js.node.Readline.createInterface({ + input: js.Node.process.stdin, + output: js.Node.process.stdout + }); + function loop() + rl.question("Message: ", res -> { + var closing = res == "close"; + requestStream.trigger(Data(tink.Chunk.ofString(res))); + if (closing) { + requestStream.trigger(End); + rl.close(); + } else + loop(); + }); + loop(); + #elseif server + var container = new NodeContainer2(8080); + function server() { + container.runSecure({ + cert: js.node.Fs.readFileSync('./localhost-cert.pem'), + key: js.node.Fs.readFileSync('./localhost-privkey.pem'), + }, (req:IncomingRequest) -> { + var id = req.header.get("tink-stream-id").join(';'); + trace('Incoming stream: $id'); + var outStream:IdealSource = switch req.body { + case Plain(source): + var ret = source.chunked().map((s:tink.Chunk) -> { + var payload = tink.Chunk.ofString('Response to $id: $s'); + trace('sending $payload'); + Promise.resolve(payload); + }).idealize(e -> [tink.Chunk.ofString('Error: $e')].iterator()); + cast ret; + case Parsed(parts): + var ret = parts.map(part -> switch part.value { + case Value(text): tink.Chunk.ofString('${part.name}: $text'); + default: tink.Chunk.ofString('error'); + }).iterator(); + ret; + } + outStream.split(tink.Chunk.ofString('close')).after.chunked().forEach(c -> Resume).next(_->Noise).handle(() -> { + trace('Done with stream $id'); + }); + Future.sync(OutgoingResponse.ofStream(outStream)); + }).next(r -> { + switch r { + case Running(running): + handleRunningState(running); + case Failed(e): + trace(e); + case Shutdown: + trace("Shutdown"); + } + Noise; + }).eager(); + } + server(); + #end + } +} From 554f7c804d96323e3de40638f6610e1e8e6dab1b Mon Sep 17 00:00:00 2001 From: Gabriel Hayes Date: Mon, 20 Jul 2020 22:50:37 -0500 Subject: [PATCH 2/2] Fix http2 hxml --- http2.hxml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/http2.hxml b/http2.hxml index 783a7f6..01761f4 100644 --- a/http2.hxml +++ b/http2.hxml @@ -1,8 +1,11 @@ + -cp tests -dce full +-lib tink_http +-lib hxnodejs -lib hxnodejs-http2 -main Test ---next +--each -D client -js bin/test/client.js --next