Skip to content

Commit

Permalink
Add asStream support
Browse files Browse the repository at this point in the history
  • Loading branch information
delvedor committed Dec 16, 2021
1 parent e3a1fed commit 31baef5
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 19 deletions.
5 changes: 4 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ export type {
ConnectionOptions,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestResponse
ConnectionRequestOptionsAsStream,
ConnectionRequestResponse,
ConnectionRequestResponseAsStream
} from './lib/connection'

export type {
Expand All @@ -47,6 +49,7 @@ export type {
} from './lib/pool'

export type {
TransportOptions,
TransportRequestParams,
TransportRequestOptions,
TransportRequestOptionsWithMeta,
Expand Down
20 changes: 9 additions & 11 deletions src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,6 @@ export default class Transport {
)
}

// TODO: fixme
// if (options.asStream === true) params.asStream = true

// handle compression
if (connectionParams.body !== '' && connectionParams.body != null) {
if (isStream(connectionParams.body)) {
Expand Down Expand Up @@ -454,7 +451,9 @@ export default class Transport {
maxResponseSize,
maxCompressedResponseSize,
signal,
timeout: toMs(options.requestTimeout != null ? options.requestTimeout : this[kRequestTimeout])
timeout: toMs(options.requestTimeout != null ? options.requestTimeout : this[kRequestTimeout]),
...(options.asStream === true ? { asStream: true } : null)
// asStream: options.asStream ? true : undefined
})
result.statusCode = statusCode
result.headers = headers
Expand All @@ -463,6 +462,12 @@ export default class Transport {
throw new ProductNotSupportedError(this[kProductCheck] as string, result)
}

if (options.asStream === true) {
result.body = body
this[kDiagnostic].emit('response', null, result)
return returnMeta ? result : body
}

const contentEncoding = (headers['content-encoding'] ?? '').toLowerCase()
if (contentEncoding.includes('gzip') || contentEncoding.includes('deflate')) {
body = await unzip(body)
Expand All @@ -473,13 +478,6 @@ export default class Transport {
body = body.toString()
}

// TODO: fixme
// if (options.asStream === true) {
// result.body = response
// this[kDiagnostic].emit('response', null, result)
// return result
// }

const isHead = params.method === 'HEAD'
// we should attempt the payload deserialization only if:
// - a `content-type` is defined and is equal to `application/json`
Expand Down
15 changes: 13 additions & 2 deletions src/connection/BaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,26 @@ export interface ConnectionRequestOptions {
context: any
maxResponseSize?: number
maxCompressedResponseSize?: number
asStream?: boolean
signal?: AbortSignal
timeout?: number
}

export interface ConnectionRequestOptionsAsStream extends ConnectionRequestOptions {
asStream: true
}

export interface ConnectionRequestResponse {
body: string | Buffer
headers: http.IncomingHttpHeaders
statusCode: number
}

export interface ConnectionRequestResponseAsStream {
body: ReadableStream
headers: http.IncomingHttpHeaders
statusCode: number
}

export default class BaseConnection {
url: URL
tls: TlsConnectionOptions | null
Expand Down Expand Up @@ -127,7 +136,9 @@ export default class BaseConnection {
}

/* istanbul ignore next */
async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse>
async request (params: ConnectionRequestParams, options: ConnectionRequestOptionsAsStream): Promise<ConnectionRequestResponseAsStream>
async request (params: ConnectionRequestParams, options: any): Promise<any> {
throw new ConfigurationError('The request method should be implemented by extended classes')
}

Expand Down
17 changes: 15 additions & 2 deletions src/connection/HttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* under the License.
*/

/* eslint-disable @typescript-eslint/restrict-template-expressions */

import hpagent from 'hpagent'
import http from 'http'
import https from 'https'
Expand All @@ -27,7 +29,9 @@ import BaseConnection, {
ConnectionOptions,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestOptionsAsStream,
ConnectionRequestResponse,
ConnectionRequestResponseAsStream,
getIssuerCertificate
} from './BaseConnection'
import { kCaFingerprint } from '../symbols'
Expand Down Expand Up @@ -87,7 +91,9 @@ export default class HttpConnection extends BaseConnection {
: https.request
}

async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse>
async request (params: ConnectionRequestParams, options: ConnectionRequestOptionsAsStream): Promise<ConnectionRequestResponseAsStream>
async request (params: ConnectionRequestParams, options: any): Promise<any> {
return await new Promise((resolve, reject) => {
let cleanedListeners = false

Expand All @@ -109,7 +115,6 @@ export default class HttpConnection extends BaseConnection {

this._openRequests++
if (options.signal != null) {
// @ts-expect-error
options.signal.addEventListener(
'abort',
() => request.abort(),
Expand All @@ -121,6 +126,14 @@ export default class HttpConnection extends BaseConnection {
cleanListeners()
this._openRequests--

if (options.asStream === true) {
return resolve({
body: response,
statusCode: response.statusCode as number,
headers: response.headers
})
}

const contentEncoding = (response.headers['content-encoding'] ?? '').toLowerCase()
const isCompressed = contentEncoding.includes('gzip') || contentEncoding.includes('deflate')
const isVectorTile = (response.headers['content-type'] ?? '').includes('application/vnd.mapbox-vector-tile')
Expand Down
17 changes: 15 additions & 2 deletions src/connection/UndiciConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* under the License.
*/

/* eslint-disable @typescript-eslint/restrict-template-expressions */

import { EventEmitter } from 'events'
import Debug from 'debug'
import buffer from 'buffer'
Expand All @@ -26,7 +28,9 @@ import BaseConnection, {
ConnectionOptions,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestOptionsAsStream,
ConnectionRequestResponse,
ConnectionRequestResponseAsStream,
getIssuerCertificate
} from './BaseConnection'
import { Pool, buildConnector, Dispatcher } from 'undici'
Expand Down Expand Up @@ -109,7 +113,9 @@ export default class Connection extends BaseConnection {
this.pool = new Pool(this.url.toString(), undiciOptions)
}

async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse>
async request (params: ConnectionRequestParams, options: ConnectionRequestOptionsAsStream): Promise<ConnectionRequestResponseAsStream>
async request (params: ConnectionRequestParams, options: any): Promise<any> {
const maxResponseSize = options.maxResponseSize ?? MAX_STRING_LENGTH
const maxCompressedResponseSize = options.maxCompressedResponseSize ?? MAX_BUFFER_LENGTH
const requestParams = {
Expand All @@ -135,7 +141,6 @@ export default class Connection extends BaseConnection {
timeoutId = setTimeout(() => {
timedout = true
if (options.signal != null) {
// @ts-expect-error
options.signal.dispatchEvent('abort')
} else {
this[kEmitter].emit('abort')
Expand Down Expand Up @@ -168,6 +173,14 @@ export default class Connection extends BaseConnection {
}
}

if (options.asStream === true) {
return {
statusCode: response.statusCode,
headers: response.headers,
body: response.body
}
}

const contentEncoding = (response.headers['content-encoding'] ?? '').toLowerCase()
const isCompressed = contentEncoding.includes('gzip') || contentEncoding.includes('deflate') // eslint-disable-line
const isVectorTile = (response.headers['content-type'] ?? '').includes('application/vnd.mapbox-vector-tile')
Expand Down
4 changes: 3 additions & 1 deletion src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ export type {
ConnectionOptions,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestResponse
ConnectionRequestOptionsAsStream,
ConnectionRequestResponse,
ConnectionRequestResponseAsStream
} from './BaseConnection'

export {
Expand Down

0 comments on commit 31baef5

Please sign in to comment.