Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions nifi-docs/src/main/asciidoc/administration-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3847,6 +3847,93 @@ The Secrets Manager delegates to Parameter Providers to retrieve secret values.
|`nifi.secrets.manager.cache.duration`|The duration for which resolved secret values are cached before being refreshed from the underlying Parameter Providers. Accepts any NiFi time duration value such as `5 mins`, `30 secs`, etc. A value of `0 sec` disables caching entirely. Defaults to `5 mins`.
|====

== Migrating a Connector from a Versioned Process Group

NiFi supports a one-time path that migrates a newly created Connector from a source Versioned Process Group.
The source Versioned Process Group is used as a reference: the Connector reads it and updates its own managed flow to mirror the source's configuration, parameters, and component state.
The source flow itself is not installed onto the Connector.
Only Connectors that explicitly opt in to migration support can use this capability.

=== Prerequisites for a local migration

When the source is a local Process Group on the same NiFi instance, the Process Group must satisfy all of the following requirements before NiFi will allow the migration:

- The source Process Group must be under version control.
- The source Process Group must be `UP_TO_DATE` with the latest published version in Flow Registry.
- All processors in the source Process Group must be stopped.
- All controller services in the source Process Group must be disabled.
- All queues in the source Process Group must be empty.
- The source Process Group must not reference controller services outside of the Process Group.
- The target Connector must be newly created, stopped, and not previously migrated.
- The target Connector's active flow must be empty when migration begins. Connectors that ship a non-empty initial flow are not compatible with the migration path.

If the source Process Group is not under version control, export it and use the uploaded payload path instead.

=== Migration paths

There are two ways to run the migration:

- Local Process Group migration:
Select an eligible Process Group that already exists on the current NiFi canvas.
NiFi validates the prerequisites listed above before starting the migration.
- Uploaded payload migration:
Export a Process Group definition and upload the resulting flow snapshot payload to the Connector migration endpoint.
This path is intended for non-version-controlled sources, sources exported from another NiFi instance, or any other source that cannot satisfy the local version-control requirements.

For uploaded payloads, NiFi does not require the source flow to be under version control.
The Connector's own migration support check determines whether the uploaded flow can be imported.

=== Running a migration

. Create a new Connector instance.
. If the source Process Group is local and registry-backed, choose it from the Connector migration source list.
. If the source Process Group is not eligible for the local path, export it with component state included and upload the payload to the Connector migration payload endpoint.
. Start the Connector migration request.
. Poll the request until it completes.
. Open the Connector configuration and supply any missing sensitive values or secrets.
. Verify the configuration and start the Connector.

=== What NiFi migrates

During migration, NiFi provides the Connector with the exported flow definition, parameter contexts, referenced assets, and any attached component state for `@Stateful` components.
The Connector performs the flow translation and updates its own managed flow to mirror the source.

For local migrations, a Connector can also copy referenced assets from the source Process Group into the Connector's own asset namespace.
Uploaded payload migrations do not include live access to source assets, so any required asset content must be re-uploaded separately after the migration.

Sensitive parameter values are not present in exported flow definitions.
After migration, configure the Connector with any missing secret values before starting it.

=== Cleanup after a successful migration

After a successful local migration, NiFi disables the source Process Group and renames it with the `(Migrated) ` prefix.
This makes it clear that the source flow has already been used as the basis for a Connector migration.

Uploaded payloads and migration request state are kept only in memory for the lifetime of the request.
When the request succeeds, fails, or is cancelled, NiFi removes the uploaded payload associated with that request.

=== Cluster-topology rule for component state

When the source flow being migrated includes LOCAL component state for a stateful component (for example a `@Stateful` Processor or Controller Service), the destination cluster must have at least as many connected nodes as the source flow's exported state references.
The rule applied by the migration manager is:

----
size(component.localNodeStates) <= number of connected nodes in the destination cluster
----

This is the same rule that applies when importing a Versioned Process Group into a destination cluster.
NiFi enforces it for both the local migration path and the uploaded payload path so that LOCAL state can be unambiguously distributed across the destination cluster.

When the migration request fails with a message such as `Cannot import flow with component state: the flow definition contains local state from N source node(s) but the destination cluster has only M connected node(s)`, take one of the following actions:

- Reconnect any disconnected cluster nodes so that the destination cluster has at least `N` connected nodes, then re-run the migration.
- Re-export the source flow without component state (uncheck the "include component state" option) and re-upload the payload.
The Connector will start with empty LOCAL state on every destination node, which is appropriate when the LOCAL state is reproducible from the upstream system.
- Migrate into a cluster that has at least `N` connected nodes.

The migration request is also rejected when any cluster node is in the `CONNECTING`, `DISCONNECTED`, or `DISCONNECTING` state at the time the request is submitted, because component state and asset content cannot be synchronized to a node that is not currently connected.
Reconnect the disconnected nodes and re-submit the request.

[[upgrading_nifi]]
== Upgrading NiFi

Expand Down
120 changes: 120 additions & 0 deletions nifi-docs/src/main/asciidoc/developer-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2697,6 +2697,126 @@ The following command is used to generate a standard binary distribution of Apac

`mvn clean install -Pcontrib-check`

== Supporting migration from a Versioned Process Group

Connectors can opt in to a one-time migration flow that uses a source Versioned Process Group as a reference for populating a freshly created Connector.
This is intended for Connectors that can translate an exported flow definition, its parameter contexts, referenced assets, and any attached component state into their own managed flow.
The source Versioned Process Group is not installed onto the Connector; instead, the Connector reads the source as input and updates its own flow to mirror the source's configuration and state.

Migration support is optional.
Connectors opt in by implementing the optional capability interface `org.apache.nifi.components.connector.migration.MigratableConnector`.
A Connector that does not implement `MigratableConnector` is never offered as a migration target and continues to behave as before.

=== Migration contract

`MigratableConnector` is a separate interface that the Connector class must implement (typically alongside extending `AbstractConnector`). It exposes three methods:

- `isMigrationSupported(ConnectorMigrationContext)` is a cheap, side-effect-free predicate.
It should inspect `context.getSourceFlow()` and return `true` only when the Connector can be migrated from the source structure.
Implementations should limit themselves to structural checks such as processor types, parameter names, and exported metadata.
Component state and asset content are not available at listing time and must not be relied on; per-state precondition checks belong inside `migrateConfiguration` or `migrateState`.
This method must not call `ConnectorMigrationContext.copyAssetFromSource(...)` or any of the write APIs described below; the framework wraps the eligibility-time context so any such attempt is rejected with `IllegalStateException` and the Connector is filtered out of the listing.
- `migrateConfiguration(ConnectorMigrationContext)` records the configuration the Connector wants on the other side of the migration.
The Connector reads `context.getSourceFlow()` and records configuration changes via `context.setProperties(stepName, propertyValues)` (merge) or `context.replaceProperties(stepName, propertyValues)` (full replacement of the step).
These calls are applied to a working copy of the Connector's configuration that the framework seeded with the Connector's current active configuration, so the working copy always reflects the fully-merged result.
After `migrateConfiguration` returns, the framework drives `Connector.applyUpdate(workingContext, activeContext)` from that merged configuration so the managed Process Group is rebuilt by the same code path the framework uses on restart.
The Connector must not call `updateFlow(...)` from this method; the framework hands the Connector a read-only wrapper around the active `FlowContext`, so any direct `updateFlow(...)` attempt is rejected with `IllegalStateException`.
Component-state writes are also rejected in this phase: `setComponentState(...)` throws because the managed components do not exist yet.
- `migrateState(ConnectorMigrationContext)` runs after the framework has rebuilt the managed Process Group from the merged configuration.
At this point every Processor and Controller Service in the source flow has a corresponding managed component in the active `FlowContext`, so the Connector can match each source `VersionedConfigurableExtension` to its managed counterpart by versioned identifier.
The Connector records the desired `StateManager` state for each managed component via `context.setComponentState(versionedComponentId, versionedComponentState)`.
After `migrateState` returns, the framework writes each staged `VersionedComponentState` into the corresponding managed component's live `StateManager`.
Configuration writes (`setProperties`, `replaceProperties`) and direct `updateFlow(...)` calls are rejected in this phase for the same reasons described above.

The two phases together form a single, all-or-nothing migration. The framework only commits the merged configuration onto the Connector's persisted active configuration after both `migrateState` and the framework's subsequent state-application step have succeeded. Any failure in either phase rolls the migration back: the managed Process Group is restored from `getInitialFlow()`, the working configuration copy is discarded, any assets copied via `copyAssetFromSource(...)` are deleted, and the Connector's persisted configuration remains unchanged.

=== Accessing the source flow

`ConnectorMigrationContext` exposes the source `VersionedExternalFlow` using `getSourceFlow()`.
The source includes:

- The exported `VersionedProcessGroup`
- Parameter contexts
- Referenced assets
- Attached `VersionedComponentState` for `@Stateful` components
- Export metadata such as flow name, registry identifiers, and version

The same context also indicates whether the request came from a local Process Group on the same NiFi instance or from an uploaded payload using `isLocalMigration()`.

The active `FlowContext` is available via `context.getActiveFlowContext()` and is read-only during migration: it forwards reads (configuration, managed Process Group) to the live active flow context but rejects `updateFlow(...)` with `IllegalStateException`. The Connector should treat this object as a window onto the current state of the Connector, not as a write target.

=== Recording configuration changes

A typical `migrateConfiguration` implementation translates the source flow into a representation the Connector can rebuild from later (often a serialized version of the source `VersionedExternalFlow` or a derived configuration), then stages it onto the Connector's own configuration:

[source,java]
----
@Override
public void migrateConfiguration(final ConnectorMigrationContext context) throws FlowUpdateException {
final VersionedExternalFlow sourceFlow = context.getSourceFlow();
final String serializedSourceFlow = OBJECT_MAPPER.writeValueAsString(sourceFlow);

// Record the configuration the Connector wants on the other side of the migration. The framework merges this
// delta onto the active configuration and then drives Connector.applyUpdate(workingContext, activeContext),
// which is the same path used on every restart - so the managed Process Group is rebuilt the same way after
// migration and after every subsequent restart.
context.setProperties(MIGRATION_STATE_STEP_NAME, Map.of(MIGRATED_SOURCE_FLOW_PROPERTY_NAME, serializedSourceFlow));
}
----

`setProperties` is additive: existing properties on the step keep their values unless the staged map overrides them.
`replaceProperties` is destructive: the step is reset to exactly the supplied property map, and any pre-existing properties not present in the map are removed.

The Connector's `applyUpdate(workingContext, activeContext)` implementation reads the merged configuration from the supplied `workingContext`, rebuilds the desired managed Process Group, and calls `getInitializationContext().updateFlow(activeContext, rebuiltFlow)` to install it. Because the framework drives the same `applyUpdate(...)` call during restart via `inheritConfiguration(...)`, the migrated managed flow is rebuilt automatically on every restart from the persisted configuration; the Connector does not need to do anything extra to make migrations survive restarts.

=== Recording component state

After the managed Process Group has been rebuilt, `migrateState` records the `StateManager` state for individual managed components:

[source,java]
----
@Override
public void migrateState(final ConnectorMigrationContext context) {
final VersionedProcessGroup sourceGroup = context.getSourceFlow().getFlowContents();
for (final VersionedProcessor sourceProcessor : sourceGroup.getProcessors()) {
final VersionedComponentState sourceState = sourceProcessor.getComponentState();
if (sourceState != null) {
context.setComponentState(sourceProcessor.getIdentifier(), sourceState);
}
}
}
----

The first argument to `setComponentState` is the source-side versioned identifier; the framework resolves it to the matching managed Processor or Controller Service and writes the supplied `VersionedComponentState` into that component's live `StateManager`. Cluster-scope state is applied unconditionally on every node; per-node local-scope state is selected by node ordinal so a clustered migration installs the right entry on each node. Components whose `@Stateful` annotation does not declare a requested scope are skipped with a warning rather than failing the migration.

`setComponentState` only stages the desired state; the framework writes it after `migrateState` returns. If a Connector calls `setComponentState` more than once for the same versioned identifier, only the last call is applied.

=== Rewriting parameters and assets

The exported source flow contains parameter contexts and any asset references that were attached to exported parameters.
Connectors are responsible for mapping those values into their own target shape inside `migrateConfiguration`.

A Connector can copy an asset from a local source Process Group into its own asset namespace using `ConnectorMigrationContext.copyAssetFromSource(String)`.
This method is available only for local migrations.
It throws `IllegalArgumentException` when the supplied source asset identifier is null or blank, and `IllegalStateException` when invoked for an uploaded-payload migration.
Uploaded payloads do not have access to a live source asset namespace, so asset references from uploaded payloads must be handled outside of the migration request.

=== Sensitive values

Sensitive parameter values and other secret material are not present in the exported source flow.
Connectors must leave those values unset during migration.
After the migration completes, the user supplies the missing values through the normal Configure step before starting the Connector.

=== Additional expectations

- `isMigrationSupported(...)` must not mutate flow state, parameter values, or assets.
- `migrateConfiguration` and `migrateState` should either complete successfully or throw an exception. Any thrown exception aborts the migration and rolls back every side effect: copied assets are deleted, the managed Process Group is restored from `getInitialFlow()`, every component's `StateManager` state is cleared, and the persisted active configuration is left unchanged.
- Migration is rejected when the Connector's currently managed flow no longer matches the flow returned by its `getInitialFlow()`.
The framework compares the two structurally (ignoring environmental differences such as component scheduled state) at the time `migrateConfiguration` is invoked, and again whenever the `MIGRATE` allowable action is evaluated, so the user is shown why migration is unavailable.
This check is restart-safe because both inputs are recomputed each time from durable sources: the Connector's persisted configuration drives `getInitialFlow()`, and the current managed flow is read from the live flow tree restored from `flow.json.gz`.
- Migration is one-shot from the framework's point of view.
Once the Connector's flow has been updated by a migration, its currently managed flow no longer matches `getInitialFlow()`, and a subsequent migration request will be rejected until the Connector is discarded and a fresh one is created.

== How to contribute to Apache NiFi

We are always excited to have contributions from the community - especially from new contributors!
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;

import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.xml.bind.annotation.XmlType;

@XmlType(name = "migrationPayload")
public class MigrationPayloadDTO {
private String payloadId;

@Schema(description = "The identifier of the uploaded migration payload.")
public String getPayloadId() {
return payloadId;
}

public void setPayloadId(final String payloadId) {
this.payloadId = payloadId;
}
}
Loading
Loading