Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions examples/query_tags.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
const { DBSQLClient } = require('..');

const client = new DBSQLClient();

const host = process.env.DATABRICKS_HOST;
const path = process.env.DATABRICKS_HTTP_PATH;
const token = process.env.DATABRICKS_TOKEN;

client
.connect({ host, path, token })
.then(async (client) => {
// Session-level query tags: applied to every statement run on this session
// (serialized into the session's QUERY_TAGS configuration).
const session = await client.openSession({
queryTags: {
team: 'engineering',
env: 'dev',
driver: 'node',
},
});

// Statement A: inherits session-level tags only.
const opA = await session.executeStatement('SELECT 1 AS inherits_session_tags');
console.log(await opA.fetchAll());
await opA.close();

// Statement B: statement-level query tags via executeStatement options.
// These are passed via confOverlay as "query_tags" and apply ONLY to this statement.
// Note: `env` here overrides the session-level `env: 'dev'` — for this statement
// it will be `env: 'prod'`. Subsequent statements without statement-level tags
// revert to the session-level values.
const opB = await session.executeStatement('SELECT 2 AS has_statement_tags', {
queryTags: {
env: 'prod',
request_id: 'abc-123',
feature: 'reporting',
},
});
console.log(await opB.fetchAll());
await opB.close();

// Statement C: demonstrates escaping of special characters (`\`, `:`, `,`)
// in tag values, plus null/undefined values which serialize as bare keys.
const opC = await session.executeStatement('SELECT 3 AS escaped_and_null_tags', {
queryTags: {
path: 'C:\\users\\me',
note: 'hello, world',
flag: null,
},
});
console.log(await opC.fetchAll());
await opC.close();

await session.close();
await client.close();
})
.catch((error) => {
console.log(error);
});
6 changes: 5 additions & 1 deletion examples/session_params.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ client
.connect({ host, path, token })
.then(async (client) => {
const session = await client.openSession({
queryTags: {
team: 'engineering',
test: 'session-params',
driver: 'node',
},
configuration: {
QUERY_TAGS: 'team:engineering,test:session-params,driver:node',
ansi_mode: 'false',
},
});
Expand Down
12 changes: 11 additions & 1 deletion lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import HttpConnection from './connection/connections/HttpConnection';
import IConnectionOptions from './connection/contracts/IConnectionOptions';
import Status from './dto/Status';
import HiveDriverError from './errors/HiveDriverError';
import { buildUserAgentString, definedOrError } from './utils';
import { buildUserAgentString, definedOrError, serializeQueryTags } from './utils';
import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
import DatabricksOAuth, { OAuthFlow } from './connection/auth/DatabricksOAuth';
import {
Expand Down Expand Up @@ -292,6 +292,16 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
configuration['spark.sql.thriftserver.metadata.metricview.enabled'] = 'true';
}

// Serialize queryTags dict and set in configuration; takes precedence over configuration.QUERY_TAGS
if (request.queryTags !== undefined) {
const serialized = serializeQueryTags(request.queryTags);
if (serialized) {
configuration['QUERY_TAGS'] = serialized;
} else {
delete configuration['QUERY_TAGS'];
}
}

const response = await this.driver.openSession({
client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8),
...getInitialNamespaceOptions(request.initialCatalog, request.initialSchema),
Expand Down
7 changes: 6 additions & 1 deletion lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import IOperation from './contracts/IOperation';
import DBSQLOperation from './DBSQLOperation';
import Status from './dto/Status';
import InfoValue from './dto/InfoValue';
import { definedOrError, LZ4, ProtocolVersion } from './utils';
import { definedOrError, LZ4, ProtocolVersion, serializeQueryTags } from './utils';
import CloseableCollection from './utils/CloseableCollection';
import { LogLevel } from './contracts/IDBSQLLogger';
import HiveDriverError from './errors/HiveDriverError';
Expand Down Expand Up @@ -227,6 +227,11 @@ export default class DBSQLSession implements IDBSQLSession {
request.parameters = getQueryParameters(options.namedParameters, options.ordinalParameters);
}

const serializedQueryTags = serializeQueryTags(options.queryTags);
if (serializedQueryTags !== undefined) {
request.confOverlay = { ...request.confOverlay, query_tags: serializedQueryTags };
}

if (ProtocolVersion.supportsCloudFetch(this.serverProtocolVersion)) {
request.canDownloadResult = options.useCloudFetch ?? clientConfig.useCloudFetch;
}
Expand Down
6 changes: 6 additions & 0 deletions lib/contracts/IDBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ export interface OpenSessionRequest {
initialCatalog?: string;
initialSchema?: string;
configuration?: { [key: string]: string };
/**
* Session-level query tags as key-value pairs. Serialized and passed via session configuration
* as "QUERY_TAGS". Values may be null/undefined to include a key without a value.
* If both queryTags and configuration.QUERY_TAGS are specified, queryTags takes precedence.
*/
queryTags?: Record<string, string | null | undefined>;
}

export default interface IDBSQLClient {
Expand Down
6 changes: 6 additions & 0 deletions lib/contracts/IDBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ export type ExecuteStatementOptions = {
stagingAllowedLocalPath?: string | string[];
namedParameters?: Record<string, DBSQLParameter | DBSQLParameterValue>;
ordinalParameters?: Array<DBSQLParameter | DBSQLParameterValue>;
/**
* Per-statement query tags as key-value pairs. Serialized and passed via confOverlay
* as "query_tags". Values may be null/undefined to include a key without a value.
* These tags apply only to this statement and do not persist across queries.
*/
queryTags?: Record<string, string | null | undefined>;
};

export type TypeInfoRequest = {
Expand Down
11 changes: 10 additions & 1 deletion lib/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,14 @@ import buildUserAgentString from './buildUserAgentString';
import formatProgress, { ProgressUpdateTransformer } from './formatProgress';
import LZ4 from './lz4';
import * as ProtocolVersion from './protocolVersion';
import { serializeQueryTags } from './queryTags';

export { definedOrError, buildUserAgentString, formatProgress, ProgressUpdateTransformer, LZ4, ProtocolVersion };
export {
definedOrError,
buildUserAgentString,
formatProgress,
ProgressUpdateTransformer,
LZ4,
ProtocolVersion,
serializeQueryTags,
};
35 changes: 35 additions & 0 deletions lib/utils/queryTags.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Serializes a query tags dictionary into a string for use in confOverlay.
*
* Format: comma-separated key:value pairs, e.g. "key1:value1,key2:value2"
* - If a value is null or undefined, the key is included without a colon or value
* - Backslashes in keys are escaped; other special characters in keys are not escaped
* - Special characters (backslash, colon, comma) in values are backslash-escaped
*
* @param queryTags - dictionary of query tag key-value pairs
* @returns serialized string, or undefined if input is empty/null/undefined
*/
export function serializeQueryTags(
queryTags: Record<string, string | null | undefined> | null | undefined,
): string | undefined {
if (queryTags == null) {
return undefined;
}

const keys = Object.keys(queryTags);
if (keys.length === 0) {
return undefined;
}

return keys
.map((key) => {
const escapedKey = key.replace(/\\/g, '\\\\');
const value = queryTags[key];
if (value == null) {
return escapedKey;
}
const escapedValue = value.replace(/[\\:,]/g, (c) => `\\${c}`);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be doing escaping on the client/driver end ? I remember in SEA escaping happens on the server end. Although currently we only support thrift in node driver, lets confirm if server needs escaping in input.

return `${escapedKey}:${escapedValue}`;
})
.join(',');
}
45 changes: 45 additions & 0 deletions tests/unit/DBSQLClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -636,4 +636,49 @@ describe('DBSQLClient.enableMetricViewMetadata', () => {
'spark.sql.thriftserver.metadata.metricview.enabled': 'true',
});
});

it('should serialize queryTags dict and set in session configuration', async () => {
const client = new DBSQLClient();
const thriftClient = new ThriftClientStub();
sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient));

await client.openSession({
queryTags: { team: 'data-eng', project: 'etl' },
});

expect(thriftClient.openSessionReq?.configuration).to.deep.equal({
QUERY_TAGS: 'team:data-eng,project:etl',
});
});

it('should let queryTags take precedence over configuration.QUERY_TAGS', async () => {
const client = new DBSQLClient();
const thriftClient = new ThriftClientStub();
sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient));

await client.openSession({
queryTags: { team: 'new-team' },
configuration: { QUERY_TAGS: 'team:old-team,other:value', ansi_mode: 'true' },
});

expect(thriftClient.openSessionReq?.configuration).to.deep.equal({
QUERY_TAGS: 'team:new-team',
ansi_mode: 'true',
});
});

it('should remove QUERY_TAGS from configuration when queryTags is empty', async () => {
const client = new DBSQLClient();
const thriftClient = new ThriftClientStub();
sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient));

await client.openSession({
queryTags: {},
configuration: { QUERY_TAGS: 'team:old-team', ansi_mode: 'true' },
});

expect(thriftClient.openSessionReq?.configuration).to.deep.equal({
ansi_mode: 'true',
});
});
});
38 changes: 38 additions & 0 deletions tests/unit/DBSQLSession.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,44 @@ describe('DBSQLSession', () => {
});
});

describe('executeStatement with queryTags', () => {
it('should set confOverlay with query_tags when queryTags are provided', async () => {
const context = new ClientContextStub();
const driver = sinon.spy(context.driver);
const session = new DBSQLSession({ handle: sessionHandleStub, context });

await session.executeStatement('SELECT 1', { queryTags: { team: 'eng', app: 'etl' } });

expect(driver.executeStatement.callCount).to.eq(1);
const req = driver.executeStatement.firstCall.args[0];
expect(req.confOverlay).to.deep.include({ query_tags: 'team:eng,app:etl' });
});

it('should not set confOverlay query_tags when queryTags is not provided', async () => {
const context = new ClientContextStub();
const driver = sinon.spy(context.driver);
const session = new DBSQLSession({ handle: sessionHandleStub, context });

await session.executeStatement('SELECT 1');

expect(driver.executeStatement.callCount).to.eq(1);
const req = driver.executeStatement.firstCall.args[0];
expect(req.confOverlay?.query_tags).to.be.undefined;
});

it('should not set confOverlay query_tags when queryTags is empty', async () => {
const context = new ClientContextStub();
const driver = sinon.spy(context.driver);
const session = new DBSQLSession({ handle: sessionHandleStub, context });

await session.executeStatement('SELECT 1', { queryTags: {} });

expect(driver.executeStatement.callCount).to.eq(1);
const req = driver.executeStatement.firstCall.args[0];
expect(req.confOverlay?.query_tags).to.be.undefined;
});
});

describe('getTypeInfo', () => {
it('should run operation', async () => {
const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() });
Expand Down
66 changes: 66 additions & 0 deletions tests/unit/utils/queryTags.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { expect } from 'chai';
import { serializeQueryTags } from '../../../lib/utils/queryTags';

describe('serializeQueryTags', () => {
it('should return undefined for null input', () => {
expect(serializeQueryTags(null)).to.be.undefined;
});

it('should return undefined for undefined input', () => {
expect(serializeQueryTags(undefined)).to.be.undefined;
});

it('should return undefined for empty object', () => {
expect(serializeQueryTags({})).to.be.undefined;
});

it('should serialize a single tag', () => {
expect(serializeQueryTags({ team: 'engineering' })).to.equal('team:engineering');
});

it('should serialize multiple tags', () => {
const result = serializeQueryTags({ team: 'engineering', app: 'etl' });
expect(result).to.equal('team:engineering,app:etl');
});

it('should omit colon for null value', () => {
expect(serializeQueryTags({ team: null })).to.equal('team');
});

it('should omit colon for undefined value', () => {
expect(serializeQueryTags({ team: undefined })).to.equal('team');
});

it('should mix null and non-null values', () => {
const result = serializeQueryTags({ team: 'eng', flag: null, app: 'etl' });
expect(result).to.equal('team:eng,flag,app:etl');
});

it('should escape backslash in value', () => {
expect(serializeQueryTags({ path: 'a\\b' })).to.equal('path:a\\\\b');
});

it('should escape colon in value', () => {
expect(serializeQueryTags({ url: 'http://host' })).to.equal('url:http\\://host');
});

it('should escape comma in value', () => {
expect(serializeQueryTags({ list: 'a,b' })).to.equal('list:a\\,b');
});

it('should escape multiple special characters in value', () => {
expect(serializeQueryTags({ val: 'a\\b:c,d' })).to.equal('val:a\\\\b\\:c\\,d');
});

it('should escape backslash in key', () => {
expect(serializeQueryTags({ 'a\\b': 'value' })).to.equal('a\\\\b:value');
});

it('should escape backslash in key with null value', () => {
expect(serializeQueryTags({ 'a\\b': null })).to.equal('a\\\\b');
});

it('should not escape other special characters in keys', () => {
expect(serializeQueryTags({ 'key:name': 'value' })).to.equal('key:name:value');
});
});
Loading