Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.nifi.kafka.service.Kafka3ConnectionService;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
import org.apache.nifi.kafka.shared.property.HeaderFormat;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
Expand Down Expand Up @@ -164,6 +165,37 @@ void testProcessingStrategyFlowFileHeaderNamePrefix() throws InterruptedExceptio
flowFile.assertAttributeNotExists("custom"); // Non-core attributes should not exist without prefix
}

@Test
void testProcessingStrategyFlowFileHexHeaderEncoding() throws InterruptedException, ExecutionException {
final String topic = UUID.randomUUID().toString();
final String groupId = topic.substring(0, topic.indexOf("-"));

runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
runner.setProperty(ConsumeKafka.TOPICS, topic);
runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue());
runner.setProperty(ConsumeKafka.HEADER_NAME_PATTERN, ".*");
runner.setProperty(ConsumeKafka.HEADER_NAME_PREFIX, "kafka.header.");
runner.setProperty(ConsumeKafka.HEADER_FORMAT, HeaderFormat.HEX.getValue());

runner.run(1, false, true);

final byte[] binaryValue = new byte[] {0x01, 0x02, (byte) 0xab, (byte) 0xff};
final List<Header> headers = List.of(new RecordHeader("binary", binaryValue));
produceOne(topic, 0, null, RECORD_VALUE, headers);
while (runner.getFlowFilesForRelationship("success").isEmpty()) {
runner.run(1, false, false);
}

runner.run(1, true, false);

final Iterator<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS).iterator();
assertTrue(flowFiles.hasNext());

final MockFlowFile flowFile = flowFiles.next();
flowFile.assertContentEquals(RECORD_VALUE);
flowFile.assertAttributeEquals("kafka.header.binary", "0102abff");
}

/**
* Test ability to specify a topic regular expression to query for messages.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
import org.apache.nifi.kafka.processors.producer.wrapper.InjectMetadataRecord;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.apache.nifi.kafka.shared.property.HeaderFormat;
import org.apache.nifi.kafka.shared.property.KeyFormat;
import org.apache.nifi.kafka.shared.property.OutputStrategy;
import org.apache.nifi.provenance.ProvenanceEventRecord;
Expand All @@ -36,6 +37,7 @@
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -186,4 +188,37 @@ void testInjectMetadataRecord(final KeyFormat keyFormat, final Verifier verifier
final ProvenanceEventRecord provenanceEvent = provenanceEvents.getFirst();
assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent.getEventType());
}

@Test
void testInjectMetadataRecordHexHeaderEncoding()
throws InterruptedException, ExecutionException, IOException {
final String topic = UUID.randomUUID().toString();
final String groupId = topic.substring(0, topic.indexOf("-"));

runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
runner.setProperty(ConsumeKafka.TOPICS, topic);
runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.RECORD);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, AutoOffsetReset.EARLIEST);
runner.setProperty(ConsumeKafka.OUTPUT_STRATEGY, OutputStrategy.INJECT_METADATA);
runner.setProperty(ConsumeKafka.KEY_FORMAT, KeyFormat.STRING);
runner.setProperty(ConsumeKafka.HEADER_FORMAT, HeaderFormat.HEX);

runner.run(1, false, true);
final String message = new String(IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_RESOURCE))), StandardCharsets.UTF_8);
final byte[] binaryValue = new byte[] {0x01, 0x02, (byte) 0xab, (byte) 0xff};
produceOne(topic, 0, MESSAGE_KEY, message, Collections.singletonList(
new RecordHeader("binary", binaryValue)));
while (runner.getFlowFilesForRelationship("success").isEmpty()) {
runner.run(1, false, false);
}
runner.run(1, true, false);

final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS).getFirst();
final ArrayNode arrayNode = assertInstanceOf(ArrayNode.class, objectMapper.readTree(flowFile.getContent()));
final ObjectNode record = assertInstanceOf(ObjectNode.class, arrayNode.get(0));
final ObjectNode metadata = assertInstanceOf(ObjectNode.class, record.get(InjectMetadataRecord.METADATA));
final ObjectNode headers = assertInstanceOf(ObjectNode.class, metadata.get(InjectMetadataRecord.HEADERS));
assertEquals("0102abff", headers.get("binary").asText());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
import org.apache.nifi.kafka.processors.producer.wrapper.InjectMetadataRecord;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.apache.nifi.kafka.shared.property.HeaderFormat;
import org.apache.nifi.kafka.shared.property.KeyFormat;
import org.apache.nifi.kafka.shared.property.OutputStrategy;
import org.apache.nifi.provenance.ProvenanceEventRecord;
Expand All @@ -36,6 +37,7 @@
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -186,4 +188,36 @@ void testWrapperRecord(final KeyFormat keyFormat, final Verifier verifier)
final ProvenanceEventRecord provenanceEvent = provenanceEvents.getFirst();
assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent.getEventType());
}

@Test
void testWrapperRecordHexHeaderEncoding()
throws InterruptedException, ExecutionException, IOException {
final String topic = UUID.randomUUID().toString();
final String groupId = topic.substring(0, topic.indexOf("-"));

runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
runner.setProperty(ConsumeKafka.TOPICS, topic);
runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.RECORD);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, AutoOffsetReset.EARLIEST);
runner.setProperty(ConsumeKafka.OUTPUT_STRATEGY, OutputStrategy.USE_WRAPPER);
runner.setProperty(ConsumeKafka.KEY_FORMAT, KeyFormat.STRING);
runner.setProperty(ConsumeKafka.HEADER_FORMAT, HeaderFormat.HEX);

runner.run(1, false, true);
final String message = new String(IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_RESOURCE))), StandardCharsets.UTF_8);
final byte[] binaryValue = new byte[] {0x01, 0x02, (byte) 0xab, (byte) 0xff};
produceOne(topic, 0, MESSAGE_KEY, message, Collections.singletonList(
new RecordHeader("binary", binaryValue)));
while (runner.getFlowFilesForRelationship("success").isEmpty()) {
runner.run(1, false, false);
}
runner.run(1, true, false);

final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS).getFirst();
final ArrayNode arrayNode = assertInstanceOf(ArrayNode.class, objectMapper.readTree(flowFile.getContent()));
final ObjectNode wrapper = assertInstanceOf(ObjectNode.class, arrayNode.get(0));
final ObjectNode headers = assertInstanceOf(ObjectNode.class, wrapper.get("headers"));
assertEquals("0102abff", headers.get("binary").asText());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.connector.components.ConnectorMethod;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kafka.processors.common.HeaderValueConverter;
import org.apache.nifi.kafka.processors.common.KafkaUtils;
import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
Expand All @@ -50,6 +51,7 @@
import org.apache.nifi.kafka.service.api.consumer.SessionContext;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
import org.apache.nifi.kafka.shared.property.HeaderFormat;
import org.apache.nifi.kafka.shared.property.KeyEncoding;
import org.apache.nifi.kafka.shared.property.KeyFormat;
import org.apache.nifi.kafka.shared.property.OutputStrategy;
Expand All @@ -75,6 +77,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HexFormat;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -94,7 +97,8 @@
+ "configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the "
+ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. "
+ "A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile. No two Kafka messages will be placed into the same FlowFile if they "
+ "have different schemas, or if they have different values for a message header that is included by the <Headers to Add as Attributes> property.")
+ "have different schemas, or if they have different values for a message header that is included by the <Headers to Add as Attributes> property. "
+ "Kafka Record Header values selected for output are represented according to the Header Format property: as text decoded with the configured Header Encoding character set, or as a lowercase hexadecimal string for binary-safe output.")
@Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
@WritesAttributes({
@WritesAttribute(attribute = "record.count", description = "The number of records received"),
Expand Down Expand Up @@ -194,12 +198,21 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();

static final PropertyDescriptor HEADER_FORMAT = new PropertyDescriptor.Builder()
.name("Header Format")
.description("Specifies how Kafka Record Header values are represented when written as FlowFile attributes or record fields")
.required(true)
.defaultValue(HeaderFormat.STRING)
.allowableValues(HeaderFormat.class)
.build();

static final PropertyDescriptor HEADER_ENCODING = new PropertyDescriptor.Builder()
.name("Header Encoding")
.description("Character encoding applied when reading Kafka Record Header values and writing FlowFile attributes")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue(StandardCharsets.UTF_8.name())
.required(true)
.dependsOn(HEADER_FORMAT, HeaderFormat.STRING)
.build();

static final PropertyDescriptor HEADER_NAME_PATTERN = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -322,6 +335,7 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
MAX_UNCOMMITTED_SIZE,
MAX_UNCOMMITTED_TIME,
HEADER_NAME_PATTERN,
HEADER_FORMAT,
HEADER_ENCODING,
PROCESSING_STRATEGY,
HEADER_NAME_PREFIX,
Expand All @@ -338,7 +352,7 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
private static final Set<Relationship> SUCCESS_RELATIONSHIP = Set.of(SUCCESS);
private static final Set<Relationship> SUCCESS_FAILURE_RELATIONSHIPS = Set.of(SUCCESS, PARSE_FAILURE);

private volatile Charset headerEncoding;
private volatile HeaderValueConverter headerValueConverter;
private volatile Pattern headerNamePattern;
private volatile String headerNamePrefix;
private volatile ProcessingStrategy processingStrategy;
Expand Down Expand Up @@ -373,10 +387,21 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
}
}

private static HeaderValueConverter createHeaderValueConverter(final ProcessContext context) {
final HeaderFormat headerFormat = context.getProperty(HEADER_FORMAT).asAllowableValue(HeaderFormat.class);
return switch (headerFormat) {
case STRING -> {
final Charset charset = Charset.forName(context.getProperty(HEADER_ENCODING).getValue());
yield value -> new String(value, charset);
}
case HEX -> value -> HexFormat.of().formatHex(value);
};
}

@OnScheduled
public void onScheduled(final ProcessContext context) {
pollingContext = createPollingContext(context);
headerEncoding = Charset.forName(context.getProperty(HEADER_ENCODING).getValue());
headerValueConverter = createHeaderValueConverter(context);

final String headerNamePatternProperty = context.getProperty(HEADER_NAME_PATTERN).getValue();
if (StringUtils.isNotBlank(headerNamePatternProperty)) {
Expand Down Expand Up @@ -634,7 +659,7 @@ private ConfigVerificationResult verifyCanParse(final ProcessContext context, fi
for (final ByteRecord byteRecord : records) {
recordIndex++;
final Map<String, String> recordAttributes = KafkaUtils.toAttributes(
byteRecord, keyEncoding, headerNamePattern, headerEncoding, commitOffsets);
byteRecord, keyEncoding, headerNamePattern, headerValueConverter, commitOffsets);

try (final InputStream inputStream = new ByteArrayInputStream(byteRecord.getValue());
final RecordReader reader = readerFactory.createRecordReader(recordAttributes, inputStream, byteRecord.getValue().length, verificationLogger)) {
Expand Down Expand Up @@ -765,7 +790,7 @@ private Iterator<ByteRecord> transformDemarcator(final ProcessContext context, f

final byte[] demarcator = demarcatorValue.getBytes(StandardCharsets.UTF_8);
final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
return new ByteRecordBundler(demarcator, separateByKey, keyEncoding, headerNamePattern, headerEncoding, commitOffsets).bundle(consumerRecords);
return new ByteRecordBundler(demarcator, separateByKey, keyEncoding, headerNamePattern, headerValueConverter, commitOffsets).bundle(consumerRecords);
}

private void processInputRecords(final ProcessContext context, final ProcessSession session, final OffsetTracker offsetTracker, final Iterator<ByteRecord> consumerRecords) {
Expand All @@ -774,13 +799,13 @@ private void processInputRecords(final ProcessContext context, final ProcessSess

final KafkaMessageConverter converter;
if (outputStrategy == OutputStrategy.USE_VALUE) {
converter = new RecordStreamKafkaMessageConverter(readerFactory, writerFactory, headerEncoding, headerNamePattern,
converter = new RecordStreamKafkaMessageConverter(readerFactory, writerFactory, headerValueConverter, headerNamePattern,
keyEncoding, commitOffsets, offsetTracker, getLogger(), brokerUri);
} else if (outputStrategy == OutputStrategy.INJECT_OFFSET) {
converter = new InjectOffsetRecordStreamKafkaMessageConverter(
readerFactory,
writerFactory,
headerEncoding,
headerValueConverter,
headerNamePattern,
keyEncoding,
commitOffsets,
Expand All @@ -793,15 +818,15 @@ private void processInputRecords(final ProcessContext context, final ProcessSess
? context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class) : null;

converter = new WrapperRecordStreamKafkaMessageConverter(readerFactory, writerFactory, keyReaderFactory,
headerEncoding, headerNamePattern, keyFormat, keyEncoding, commitOffsets, offsetTracker, getLogger(), brokerUri, outputStrategy);
headerValueConverter, headerNamePattern, keyFormat, keyEncoding, commitOffsets, offsetTracker, getLogger(), brokerUri, outputStrategy);
}

converter.toFlowFiles(session, consumerRecords);
}

private void processInputFlowFile(final ProcessSession session, final OffsetTracker offsetTracker, final Iterator<ByteRecord> consumerRecords) {
final KafkaMessageConverter converter = new FlowFileStreamKafkaMessageConverter(
headerEncoding, headerNamePattern, headerNamePrefix, keyEncoding, commitOffsets, offsetTracker, brokerUri);
headerValueConverter, headerNamePattern, headerNamePrefix, keyEncoding, commitOffsets, offsetTracker, brokerUri);
converter.toFlowFiles(session, consumerRecords);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.processors.common;

/**
* Converts a Kafka Record Header value to the String form written as a FlowFile attribute or record field.
*/
@FunctionalInterface
public interface HeaderValueConverter {
String convert(byte[] value);
}
Loading
Loading