Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 20 additions & 30 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
const { PassThrough } = require('stream');
const { as: asAsyncIterable } = require('ix/asynciterable');
const { AsyncByteQueue, AsyncByteStream, RecordBatchReader } = require('apache-arrow');

const AsyncQueue = Object.getPrototypeOf(AsyncByteQueue);

const { RecordBatchReader } = require('apache-arrow');

// fastify@5 + apache-arrow@21 compat (graphistry fork, `fastify5` branch).
// Changes from the v3 `graphistry` branch:
// 1. addContentTypeParser content-type must be a full MIME under fastify@5
// ('octet-stream' -> 'application/octet-stream'; fastify@5 rejects the bare token via
// ContentType.isValid === false -> FST_ERR_CTP_INVALID_TYPE).
// 2. fastify-plugin metadata constraint bumped to '5.x'.
// 3. Multipart support DROPPED. Every fastify-arrow consumer in the graphistry stack
// (gpu-worker /post|/encode|/filter|..., /open|/read|/play|/stop|/tick; sessions; gpu-router)
// sends/receives application/octet-stream only; nothing posts multipart/form-data to a
// fastify-arrow endpoint (the one multipart sender, shapeGraphViaForm, targets
// forge-etl-python, not this plugin). Dropping @fastify/multipart also avoids the
// v6->v9 callback-API rewrite (`request.multipart()` -> parts()/file()) that fastify@5
// would otherwise force.
module.exports = require('fastify-plugin')(fastifyArrowPlugin, {
fastify: '>= 2.x', name: 'fastify-arrow'
fastify: '5.x', name: 'fastify-arrow'
});

function fastifyArrowPlugin(fastify, opts, next) {

if (!fastify.hasRequestDecorator('multipart')) {
fastify.register(require('@fastify/multipart'), opts);
}

// Add a stub octet-stream parser so fastify doesn't reject payloads with content-type octet-stream
fastify.addContentTypeParser('octet-stream', opts, async (request, payload, done) => { return payload; });
// Stub octet-stream parser so fastify doesn't reject Arrow IPC payloads.
fastify.addContentTypeParser('application/octet-stream', async (request, payload) => payload);

fastify.decorateReply('stream', replyAsStream);
fastify.decorateRequest('recordBatches', readRecordBatches);
Expand All @@ -25,31 +32,14 @@ function fastifyArrowPlugin(fastify, opts, next) {

function replyAsStream(xs = { objectMode: false }) {
const stream = new PassThrough(xs);
this.send(stream)
this.send(stream);
return stream;
}

/**
* @returns AsyncIterable<RecordBatchReader>
*/
function readRecordBatches() {
const source = this.isMultipart()
? fromMultipart(this)
: this.raw.pipe(new PassThrough());
const source = this.raw.pipe(new PassThrough());
return asAsyncIterable(RecordBatchReader.readAll(source));
}

async function* fromMultipart(request) {

const files = new AsyncQueue();
const body = request.body || (request.body = {});

request.multipart(
(_field, file, _name) => { files.write(file); },
(err) => { err != null ? files.abort(err) : files.close(); }
).on('field', (k, v) => body[k] = v);

for await (const file of files) {
yield* new AsyncByteStream(file);
}
}
Loading