From 8e4b96b915befc98272fafa2c2d8a5d0d5ce73f0 Mon Sep 17 00:00:00 2001 From: GT610 Date: Sat, 16 May 2026 13:47:44 +0800 Subject: [PATCH 1/3] Fix SFTP downloading --- lib/src/sftp/sftp_client.dart | 46 +- lib/src/socket/ssh_socket_io.dart | 91 +-- lib/src/ssh_channel.dart | 1244 ++++++++++++++--------------- 3 files changed, 706 insertions(+), 675 deletions(-) diff --git a/lib/src/sftp/sftp_client.dart b/lib/src/sftp/sftp_client.dart index ea13926..53ad054 100644 --- a/lib/src/sftp/sftp_client.dart +++ b/lib/src/sftp/sftp_client.dart @@ -33,7 +33,20 @@ class SftpClient { SftpClient(this._channel, {this.printDebug, this.printTrace}) { _startHandshake(); - _channel.stream.listen(_handleData); + _channel.stream.listen( + _handleData, + onError: (Object e, _) { + print('[SFTP] stream onError: $e'); + if (!_done.isCompleted) { + _done.completeError(e); + } + }, + onDone: () { + if (!_done.isCompleted) { + _done.complete(); + } + }, + ); } final _buffer = ChunkBuffer(); @@ -444,18 +457,34 @@ class SftpClient { } void _handleData(SSHChannelData data) { - _buffer.add(data.bytes); - _handlePackets(); + try { + _buffer.add(data.bytes); + _handlePackets(); + } catch (e) { + print('[SFTP] _handleData ERROR: $e'); + } } void _handlePackets() { const lengthHeader = 4; // 4 bytes packet length header while (_buffer.length >= lengthHeader) { - final length = _buffer.byteData.getUint32(0); - if (_buffer.length < lengthHeader + length) break; - final packet = _buffer.consume(lengthHeader + length); - final payload = Uint8List.sublistView(packet, lengthHeader); - _handlePacket(payload); + try { + final length = _buffer.byteData.getUint32(0); + // SFTP payload should not exceed a reasonable limit (16MB) + if (length > 16 * 1024 * 1024) { + print('[SFTP] _handlePackets suspicious length=$length bufferLen=${_buffer.length}'); + _buffer.clear(); + return; + } + if (_buffer.length < lengthHeader + length) break; + final packet = _buffer.consume(lengthHeader + length); + final payload = Uint8List.sublistView(packet, lengthHeader); + _handlePacket(payload); + } catch (e) { + print('[SFTP] _handlePackets ERROR: $e'); + _buffer.clear(); + return; + } } } @@ -477,6 +506,7 @@ class SftpClient { case SftpExtendedReplyPacket.packetType: return _handleExtendedReplyPacket(payload); default: + print('[SFTP] UNKNOWN packet type=$type'); printDebug?.call('SftpClient._handlePacket: unknown packet: $type'); } } diff --git a/lib/src/socket/ssh_socket_io.dart b/lib/src/socket/ssh_socket_io.dart index a8f155b..67a4c47 100644 --- a/lib/src/socket/ssh_socket_io.dart +++ b/lib/src/socket/ssh_socket_io.dart @@ -1,45 +1,46 @@ -import 'dart:async'; -import 'dart:io'; -import 'dart:typed_data'; - -import 'package:dartssh2/src/socket/ssh_socket.dart'; - -Future connectNativeSocket( - String host, - int port, { - Duration? timeout, -}) async { - final socket = await Socket.connect(host, port, timeout: timeout); - return _SSHNativeSocket._(socket); -} - -class _SSHNativeSocket implements SSHSocket { - final Socket _socket; - - _SSHNativeSocket._(this._socket); - - @override - Stream get stream => _socket; - - @override - StreamSink> get sink => _socket; - - @override - Future close() async { - await _socket.close(); - } - - @override - Future get done => _socket.done; - - @override - void destroy() { - _socket.destroy(); - } - - @override - String toString() { - final address = '${_socket.remoteAddress.host}:${_socket.remotePort}'; - return '_SSHNativeSocket($address)'; - } -} +import 'dart:async'; +import 'dart:io'; +import 'dart:typed_data'; + +import 'package:dartssh2/src/socket/ssh_socket.dart'; + +Future connectNativeSocket( + String host, + int port, { + Duration? timeout, +}) async { + final socket = await Socket.connect(host, port, timeout: timeout); + socket.setOption(SocketOption.tcpNoDelay, true); + return _SSHNativeSocket._(socket); +} + +class _SSHNativeSocket implements SSHSocket { + final Socket _socket; + + _SSHNativeSocket._(this._socket); + + @override + Stream get stream => _socket; + + @override + StreamSink> get sink => _socket; + + @override + Future close() async { + await _socket.close(); + } + + @override + Future get done => _socket.done; + + @override + void destroy() { + _socket.destroy(); + } + + @override + String toString() { + final address = '${_socket.remoteAddress.host}:${_socket.remotePort}'; + return '_SSHNativeSocket($address)'; + } +} diff --git a/lib/src/ssh_channel.dart b/lib/src/ssh_channel.dart index e95afef..babe0f5 100644 --- a/lib/src/ssh_channel.dart +++ b/lib/src/ssh_channel.dart @@ -1,622 +1,622 @@ -import 'dart:async'; -import 'dart:math'; - -import 'dart:typed_data'; - -import 'package:dartssh2/src/ssh_channel_id.dart'; -import 'package:dartssh2/src/ssh_transport.dart'; -import 'package:dartssh2/src/utils/async_queue.dart'; -import 'package:dartssh2/src/message/base.dart'; -import 'package:dartssh2/src/utils/stream.dart'; - -/// Handler of channel requests. Return true if the request was handled, false -/// if the request was not recognized or could not be handled. -typedef SSHChannelRequestHandler = bool Function( - SSH_Message_Channel_Request request, -); - -class SSHChannelController { - static const _initialWindowSize = 2 * 1024 * 1024; - static const _windowAdjustThreshold = _initialWindowSize / 2; - - final int localId; - final int localMaximumPacketSize; - final int localInitialWindowSize; - - final int remoteId; - final int remoteMaximumPacketSize; - final int remoteInitialWindowSize; - - final SSHPrintHandler? printDebug; - - final void Function(SSHMessage) sendMessage; - - SSHChannel get channel => SSHChannel(this); - - SSHChannelController({ - required this.localId, - required this.localMaximumPacketSize, - required this.localInitialWindowSize, - required this.remoteId, - required this.remoteInitialWindowSize, - required this.remoteMaximumPacketSize, - required this.sendMessage, - this.printDebug, - }) { - if (remoteInitialWindowSize > 0) { - _uploadLoop.activate(); - } - } - - /// Remaining local receive window size. - late var _localWindow = localInitialWindowSize; - - /// Remaining remote receive window size. - late var _remoteWindow = remoteInitialWindowSize; - - /// A [StreamController] that receives data from the remote side. - late final _remoteStream = StreamController( - onResume: _sendWindowAdjustIfNeeded, - ); - - /// A [StreamController] that accepts data from local end of the channel. - final _localStream = StreamController(); - - late final _localStreamConsumer = SSHChannelDataConsumer(_localStream.stream); - - /// Handler of channel requests from the remote side. - late var _requestHandler = _defaultRequestHandler; - - /// An [AsyncQueue] of pending request replies from the remote side. - final _requestReplyQueue = AsyncQueue(); - - /// Fails all pending request reply waiters. - void failPendingRequestReplies(Object error, [StackTrace? stackTrace]) { - _requestReplyQueue.failAll(error, stackTrace ?? StackTrace.current); - } - - /// true if we have sent an EOF message to the remote side. - var _hasSentEOF = false; - - /// true if we have sent an close message to the remote side. - var _hasSentClose = false; - - final _done = Completer(); - - Future sendExec(String command) async { - sendMessage( - SSH_Message_Channel_Request.exec( - recipientChannel: remoteId, - wantReply: true, - command: command, - ), - ); - return await _requestReplyQueue.next; - } - - Future sendPtyReq({ - String terminalType = 'xterm-256color', - int terminalWidth = 80, - int terminalHeight = 25, - int terminalPixelWidth = 0, - int terminalPixelHeight = 0, - Uint8List? terminalModes, - }) async { - sendMessage( - SSH_Message_Channel_Request.pty( - recipientChannel: remoteId, - termType: terminalType, - termWidth: terminalWidth, - termHeight: terminalHeight, - termPixelWidth: terminalPixelWidth, - termPixelHeight: terminalPixelHeight, - termModes: terminalModes ?? Uint8List(0), - wantReply: true, - ), - ); - return await _requestReplyQueue.next; - } - - Future sendShell() async { - sendMessage( - SSH_Message_Channel_Request.shell( - recipientChannel: remoteId, - wantReply: true, - ), - ); - return await _requestReplyQueue.next; - } - - Future sendX11Req({ - bool singleConnection = false, - String authenticationProtocol = 'MIT-MAGIC-COOKIE-1', - required String authenticationCookie, - int screenNumber = 0, - }) async { - sendMessage( - SSH_Message_Channel_Request.x11( - recipientChannel: remoteId, - wantReply: true, - singleConnection: singleConnection, - x11AuthenticationProtocol: authenticationProtocol, - x11AuthenticationCookie: authenticationCookie, - x11ScreenNumber: screenNumber, - ), - ); - return await _requestReplyQueue.next; - } - - Future sendAgentForwardingRequest() async { - sendMessage( - SSH_Message_Channel_Request( - recipientChannel: remoteId, - requestType: SSHChannelRequestType.authAgent, - wantReply: true, - ), - ); - return await _requestReplyQueue.next; - } - - Future sendSubsystem(String subsystem) async { - sendMessage( - SSH_Message_Channel_Request.subsystem( - recipientChannel: remoteId, - subsystemName: subsystem, - wantReply: true, - ), - ); - return await _requestReplyQueue.next; - } - - Future sendEnv(String name, String value) async { - sendMessage( - SSH_Message_Channel_Request.env( - recipientChannel: remoteId, - variableName: name, - variableValue: value, - wantReply: true, - ), - ); - return await _requestReplyQueue.next; - } - - void sendSignal(String signal) { - sendMessage( - SSH_Message_Channel_Request.signal( - recipientChannel: remoteId, - signalName: signal, - ), - ); - } - - void sendTerminalWindowChange({ - required int width, - required int height, - required int pixelWidth, - required int pixelHeight, - }) { - sendMessage( - SSH_Message_Channel_Request.windowChange( - recipientChannel: remoteId, - termWidth: width, - termHeight: height, - termPixelWidth: pixelWidth, - termPixelHeight: pixelHeight, - ), - ); - } - - void handleMessage(SSHMessage message) { - if (message is SSH_Message_Channel_Data) { - _handleDataMessage(message.data); - } else if (message is SSH_Message_Channel_Extended_Data) { - _handleDataMessage(message.data, type: message.dataTypeCode); - } else if (message is SSH_Message_Channel_Window_Adjust) { - _handleWindowAdjustMessage(message.bytesToAdd); - } else if (message is SSH_Message_Channel_EOF) { - _handleEOFMessage(); - } else if (message is SSH_Message_Channel_Close) { - _handleCloseMessage(); - } else if (message is SSH_Message_Channel_Request) { - _handleRequestMessage(message); - } else if (message is SSH_Message_Channel_Success) { - _handleRequestSuccessMessage(); - } else if (message is SSH_Message_Channel_Failure) { - _handleRequestFailureMessage(); - } else { - throw UnimplementedError('Unimplemented message: $message'); - } - } - - /// Closes our side of the channel. Returns a [Future] that completes when - /// the remote side has closed the channel. - Future close() async { - if (_done.isCompleted) return; - - _localStreamConsumer.cancel(); - _sendEOFIfNeeded(); - - if (_remoteStream.isClosed) { - _sendCloseIfNeeded(); - _done.complete(); - return; - } - - return _done.future; - } - - /// Closes the channel immediately in both directions. This may send a close - /// message to the remote side. After this no more data can be sent or - /// received. - void destroy() { - if (_done.isCompleted) return; - _remoteStream.close(); - _localStreamConsumer.cancel(); - _sendEOFIfNeeded(); - _sendCloseIfNeeded(); - _done.complete(); - } - - void _handleWindowAdjustMessage(int bytesToAdd) { - printDebug?.call('SSHChannel._handleWindowAdjustMessage: $bytesToAdd'); - - if (bytesToAdd < 0) { - throw ArgumentError.value(bytesToAdd, 'bytesToAdd', 'must be positive'); - } - - final next = _remoteWindow + bytesToAdd; - _remoteWindow = next & 0xFFFFFFFF; // 2³²-1 Overflow - - if (_remoteWindow > 0) { - _uploadLoop.activate(); - } - } - - void _handleDataMessage(Uint8List data, {int? type}) { - printDebug?.call('SSHChannel._handleDataMessage: len=${data.length}'); - - if (_remoteStream.isClosed) { - printDebug?.call('SSHChannel._handleDataMessage: remote already closed'); - return; - } - - _remoteStream.add(SSHChannelData(data, type: type)); - - _localWindow -= data.length; - if (_localWindow < 0) { - // Log and recover by issuing a window adjust; negative indicates protocol - // mismatch or delay in flow control from the remote. We prefer recovery - // to hard-closing for better interoperability. - printDebug?.call( - 'SSHChannel._handleDataMessage: local window underflow ' - '(localWindow=$_localWindow); recovering with WINDOW_ADJUST', - ); - } - - _sendWindowAdjustIfNeeded(); - } - - void _handleRequestMessage(SSH_Message_Channel_Request request) { - printDebug?.call('SSHChannel._handleRequest: ${request.requestType}'); - - final success = _requestHandler(request); - if (!request.wantReply) return; - success ? _sendRequestSuccess() : _sendRequestFailure(); - } - - void _handleRequestSuccessMessage() { - printDebug?.call('SSHChannel._handleRequestSuccessMessage'); - _requestReplyQueue.add(true); - } - - void _handleRequestFailureMessage() { - printDebug?.call('SSHChannel._handleRequestFailureMessage'); - _requestReplyQueue.add(false); - } - - void _handleEOFMessage() { - printDebug?.call('SSHChannel._handleEOFMessage'); - _remoteStream.close(); - } - - void _handleCloseMessage() { - printDebug?.call('SSHChannel._handleCLoseMessage'); - _remoteStream.close(); - close(); - } - - bool _defaultRequestHandler(SSH_Message_Channel_Request request) { - return false; - } - - void _sendEOFIfNeeded() { - printDebug?.call('SSHChannel._sendEOFIfNeeded'); - if (_done.isCompleted) return; - if (_hasSentEOF) return; - _hasSentEOF = true; - sendMessage(SSH_Message_Channel_EOF(recipientChannel: remoteId)); - } - - void _sendCloseIfNeeded() { - printDebug?.call('SSHChannel._sendCloseIfNeeded'); - if (_done.isCompleted) return; - if (_hasSentClose) return; - _hasSentClose = true; - - try { - sendMessage(SSH_Message_Channel_Close(recipientChannel: remoteId)); - } catch (e) { - printDebug?.call('SSHChannelController._sendCloseIfNeeded - error: $e'); - } - } - - void _sendRequestSuccess() { - printDebug?.call('SSHChannel._sendRequestSuccess'); - sendMessage(SSH_Message_Channel_Success(recipientChannel: remoteId)); - } - - void _sendRequestFailure() { - printDebug?.call('SSHChannel._sendRequestFailure'); - sendMessage(SSH_Message_Channel_Failure(recipientChannel: remoteId)); - } - - void _sendWindowAdjustIfNeeded() { - printDebug?.call('SSHChannel._sendWindowAdjustIfNeeded'); - - if (_done.isCompleted) return; - if (_remoteStream.isPaused) return; - - // Only send a window adjust message if the window is below the threshold. - // Assumes _windowAdjustThreshold is non-negative. - // If _localWindow is negative (an invalid state), it will likely be <= _windowAdjustThreshold. - if (_localWindow > _windowAdjustThreshold) return; - - const maxRfcWindowSize = 0xFFFFFFFF; // 2^32 - 1 - - // Determine the target window size, respecting RFC limits. - // localInitialWindowSize is typically a positive value (e.g., 2MB). - final int targetWindow = (localInitialWindowSize > maxRfcWindowSize || - localInitialWindowSize < 0) - ? maxRfcWindowSize - : localInitialWindowSize; - - // If the current local window is already at or above the target, - // no further increase is needed. - if (_localWindow >= targetWindow) { - // As a defensive measure, if _localWindow somehow exceeded the absolute max, - // cap it. This is more about correcting an invalid state than calculating - // bytesToAdd for this specific adjustment. - if (_localWindow > maxRfcWindowSize) { - _localWindow = maxRfcWindowSize; - } - return; - } - - // At this point, _localWindow < targetWindow. - // Calculate the number of bytes to add to reach the targetWindow. - // This will be a positive value. If _localWindow was negative (erroneous state), - // bytesToAdd will be appropriately larger to compensate. - final int bytesToAdd = targetWindow - _localWindow; - - // Update our local window state to reflect the window size we are now advertising. - _localWindow = targetWindow; - - sendMessage(SSH_Message_Channel_Window_Adjust( - recipientChannel: remoteId, - bytesToAdd: bytesToAdd, - )); - } - - late final _uploadLoop = OnceSimultaneously(() async { - while (true) { - if (_remoteWindow <= 0) { - return; - } - - final dataToRead = min(_remoteWindow, remoteMaximumPacketSize); - final data = await _localStreamConsumer.read(dataToRead); - - if (data == null) { - _sendEOFIfNeeded(); - - if (_remoteStream.isClosed) { - close(); - } - return; - } - - if (_hasSentEOF) { - return; - } - - printDebug?.call('SSHChannel._uploadLoop: len=${data.bytes.length}'); - - final message = data.isExtendedData - ? SSH_Message_Channel_Extended_Data( - recipientChannel: remoteId, - dataTypeCode: data.type!, - data: data.bytes, - ) - : SSH_Message_Channel_Data( - recipientChannel: remoteId, - data: data.bytes, - ); - - sendMessage(message); - - _remoteWindow -= data.bytes.length; - } - }); -} - -class SSHChannel { - /// The channel id on the local side. - SSHChannelId get channelId => _controller.localId; - - /// The channel id on the remote side. - SSHChannelId get remoteChannelId => _controller.localId; - - /// The maximum packet size that the remote side can receive. - int get maximumPacketSize => _controller.remoteMaximumPacketSize; - - /// A [Stream] of data received from the remote side. - Stream get stream => _controller._remoteStream.stream; - - /// A [StreamSink] that sends data to the remote side. Chucks must be - /// equal to or less than [maximumPacketSize]. - StreamSink get sink => _controller._localStream.sink; - - Future get done => _controller._done.future; - - SSHChannel(this._controller); - - final SSHChannelController _controller; - - /// Send data to the remote side. - void addData(Uint8List data, {int? type}) { - sink.add(SSHChannelData(data, type: type)); - } - - void setRequestHandler(SSHChannelRequestHandler handler) { - _controller._requestHandler = handler; - } - - Future sendExec(String command) async { - return await _controller.sendExec(command); - } - - Future sendShell() async { - return await _controller.sendShell(); - } - - Future sendX11Req({ - bool singleConnection = false, - String authenticationProtocol = 'MIT-MAGIC-COOKIE-1', - required String authenticationCookie, - int screenNumber = 0, - }) async { - return await _controller.sendX11Req( - singleConnection: singleConnection, - authenticationProtocol: authenticationProtocol, - authenticationCookie: authenticationCookie, - screenNumber: screenNumber, - ); - } - - void sendTerminalWindowChange({ - required int width, - required int height, - int pixelWidth = 0, - int pixelHeight = 0, - }) { - _controller.sendTerminalWindowChange( - width: width, - height: height, - pixelWidth: pixelWidth, - pixelHeight: pixelHeight, - ); - } - - void sendSignal(String signal) { - _controller.sendSignal(signal); - } - - /// Closes our side of the channel. Returns a [Future] that completes when - /// both sides of the channel are closed. - Future close() { - _controller._sendEOFIfNeeded(); - return _controller.close(); - } - - /// Destroys the channel in both directions. After calling this method, - /// no more data can be sent or received. - void destroy() => _controller.destroy(); - - @override - String toString() => 'SSHChannel($channelId:$remoteChannelId)'; -} - -class SSHChannelData { - /// Type of the data. Not null if the data is extended data. See: [SSHChannelExtendedDataType] - final int? type; - - final Uint8List bytes; - - bool get isExtendedData => type != null; - - SSHChannelData(this.bytes, {this.type}); -} - -class SSHChannelExtendedDataType { - static const stderr = 1; -} - -class SSHChannelDataSplitter - extends StreamTransformerBase { - SSHChannelDataSplitter(this.maxSize); - - final int maxSize; - - @override - Stream bind(Stream stream) async* { - await for (var chunk in stream) { - if (chunk.bytes.length < maxSize) { - yield chunk; - continue; - } - - final blocks = chunk.bytes.length ~/ maxSize; - - for (var i = 0; i < blocks; i++) { - yield SSHChannelData( - Uint8List.sublistView(chunk.bytes, i * maxSize, (i + 1) * maxSize), - type: chunk.type, - ); - } - - if (blocks * maxSize < chunk.bytes.length) { - yield SSHChannelData( - Uint8List.sublistView(chunk.bytes, blocks * maxSize), - type: chunk.type, - ); - } - } - } -} - -class SSHChannelDataConsumer extends StreamConsumerBase { - SSHChannelDataConsumer(super.stream); - - @override - int getLength(SSHChannelData chunk) { - return chunk.bytes.length; - } - - @override - SSHChannelData getSublistView(SSHChannelData chunk, int start, int end) { - return SSHChannelData( - Uint8List.sublistView(chunk.bytes, start, end), - type: chunk.type, - ); - } -} - -/// A function that can be invoked at most once simultaneously. -class OnceSimultaneously { - OnceSimultaneously(this._fn); - - final Future Function() _fn; - - var _isRunning = false; - - /// Call the function. If the function is already running, this is a no-op. - void activate() async { - if (_isRunning) return; - _isRunning = true; - try { - await _fn(); - } finally { - _isRunning = false; - } - } -} +import 'dart:async'; +import 'dart:math'; + +import 'dart:typed_data'; + +import 'package:dartssh2/src/ssh_channel_id.dart'; +import 'package:dartssh2/src/ssh_transport.dart'; +import 'package:dartssh2/src/utils/async_queue.dart'; +import 'package:dartssh2/src/message/base.dart'; +import 'package:dartssh2/src/utils/stream.dart'; + +/// Handler of channel requests. Return true if the request was handled, false +/// if the request was not recognized or could not be handled. +typedef SSHChannelRequestHandler = bool Function( + SSH_Message_Channel_Request request, +); + +class SSHChannelController { + static const _initialWindowSize = 2 * 1024 * 1024; + static const _windowAdjustThreshold = _initialWindowSize / 2; + + final int localId; + final int localMaximumPacketSize; + final int localInitialWindowSize; + + final int remoteId; + final int remoteMaximumPacketSize; + final int remoteInitialWindowSize; + + final SSHPrintHandler? printDebug; + + final void Function(SSHMessage) sendMessage; + + SSHChannel get channel => SSHChannel(this); + + SSHChannelController({ + required this.localId, + required this.localMaximumPacketSize, + required this.localInitialWindowSize, + required this.remoteId, + required this.remoteInitialWindowSize, + required this.remoteMaximumPacketSize, + required this.sendMessage, + this.printDebug, + }) { + if (remoteInitialWindowSize > 0) { + _uploadLoop.activate(); + } + } + + /// Remaining local receive window size. + late var _localWindow = localInitialWindowSize; + + /// Remaining remote receive window size. + late var _remoteWindow = remoteInitialWindowSize; + + /// A [StreamController] that receives data from the remote side. + late final _remoteStream = StreamController( + onResume: _sendWindowAdjustIfNeeded, + ); + + /// A [StreamController] that accepts data from local end of the channel. + final _localStream = StreamController(); + + late final _localStreamConsumer = SSHChannelDataConsumer(_localStream.stream); + + /// Handler of channel requests from the remote side. + late var _requestHandler = _defaultRequestHandler; + + /// An [AsyncQueue] of pending request replies from the remote side. + final _requestReplyQueue = AsyncQueue(); + + /// Fails all pending request reply waiters. + void failPendingRequestReplies(Object error, [StackTrace? stackTrace]) { + _requestReplyQueue.failAll(error, stackTrace ?? StackTrace.current); + } + + /// true if we have sent an EOF message to the remote side. + var _hasSentEOF = false; + + /// true if we have sent an close message to the remote side. + var _hasSentClose = false; + + final _done = Completer(); + + Future sendExec(String command) async { + sendMessage( + SSH_Message_Channel_Request.exec( + recipientChannel: remoteId, + wantReply: true, + command: command, + ), + ); + return await _requestReplyQueue.next; + } + + Future sendPtyReq({ + String terminalType = 'xterm-256color', + int terminalWidth = 80, + int terminalHeight = 25, + int terminalPixelWidth = 0, + int terminalPixelHeight = 0, + Uint8List? terminalModes, + }) async { + sendMessage( + SSH_Message_Channel_Request.pty( + recipientChannel: remoteId, + termType: terminalType, + termWidth: terminalWidth, + termHeight: terminalHeight, + termPixelWidth: terminalPixelWidth, + termPixelHeight: terminalPixelHeight, + termModes: terminalModes ?? Uint8List(0), + wantReply: true, + ), + ); + return await _requestReplyQueue.next; + } + + Future sendShell() async { + sendMessage( + SSH_Message_Channel_Request.shell( + recipientChannel: remoteId, + wantReply: true, + ), + ); + return await _requestReplyQueue.next; + } + + Future sendX11Req({ + bool singleConnection = false, + String authenticationProtocol = 'MIT-MAGIC-COOKIE-1', + required String authenticationCookie, + int screenNumber = 0, + }) async { + sendMessage( + SSH_Message_Channel_Request.x11( + recipientChannel: remoteId, + wantReply: true, + singleConnection: singleConnection, + x11AuthenticationProtocol: authenticationProtocol, + x11AuthenticationCookie: authenticationCookie, + x11ScreenNumber: screenNumber, + ), + ); + return await _requestReplyQueue.next; + } + + Future sendAgentForwardingRequest() async { + sendMessage( + SSH_Message_Channel_Request( + recipientChannel: remoteId, + requestType: SSHChannelRequestType.authAgent, + wantReply: true, + ), + ); + return await _requestReplyQueue.next; + } + + Future sendSubsystem(String subsystem) async { + sendMessage( + SSH_Message_Channel_Request.subsystem( + recipientChannel: remoteId, + subsystemName: subsystem, + wantReply: true, + ), + ); + return await _requestReplyQueue.next; + } + + Future sendEnv(String name, String value) async { + sendMessage( + SSH_Message_Channel_Request.env( + recipientChannel: remoteId, + variableName: name, + variableValue: value, + wantReply: true, + ), + ); + return await _requestReplyQueue.next; + } + + void sendSignal(String signal) { + sendMessage( + SSH_Message_Channel_Request.signal( + recipientChannel: remoteId, + signalName: signal, + ), + ); + } + + void sendTerminalWindowChange({ + required int width, + required int height, + required int pixelWidth, + required int pixelHeight, + }) { + sendMessage( + SSH_Message_Channel_Request.windowChange( + recipientChannel: remoteId, + termWidth: width, + termHeight: height, + termPixelWidth: pixelWidth, + termPixelHeight: pixelHeight, + ), + ); + } + + void handleMessage(SSHMessage message) { + if (message is SSH_Message_Channel_Data) { + _handleDataMessage(message.data); + } else if (message is SSH_Message_Channel_Extended_Data) { + _handleDataMessage(message.data, type: message.dataTypeCode); + } else if (message is SSH_Message_Channel_Window_Adjust) { + _handleWindowAdjustMessage(message.bytesToAdd); + } else if (message is SSH_Message_Channel_EOF) { + _handleEOFMessage(); + } else if (message is SSH_Message_Channel_Close) { + _handleCloseMessage(); + } else if (message is SSH_Message_Channel_Request) { + _handleRequestMessage(message); + } else if (message is SSH_Message_Channel_Success) { + _handleRequestSuccessMessage(); + } else if (message is SSH_Message_Channel_Failure) { + _handleRequestFailureMessage(); + } else { + throw UnimplementedError('Unimplemented message: $message'); + } + } + + /// Closes our side of the channel. Returns a [Future] that completes when + /// the remote side has closed the channel. + Future close() async { + if (_done.isCompleted) return; + + _localStreamConsumer.cancel(); + _sendEOFIfNeeded(); + + if (_remoteStream.isClosed) { + _sendCloseIfNeeded(); + _done.complete(); + return; + } + + return _done.future; + } + + /// Closes the channel immediately in both directions. This may send a close + /// message to the remote side. After this no more data can be sent or + /// received. + void destroy() { + if (_done.isCompleted) return; + _remoteStream.close(); + _localStreamConsumer.cancel(); + _sendEOFIfNeeded(); + _sendCloseIfNeeded(); + _done.complete(); + } + + void _handleWindowAdjustMessage(int bytesToAdd) { + printDebug?.call('SSHChannel._handleWindowAdjustMessage: $bytesToAdd'); + + if (bytesToAdd < 0) { + throw ArgumentError.value(bytesToAdd, 'bytesToAdd', 'must be positive'); + } + + final next = _remoteWindow + bytesToAdd; + _remoteWindow = next & 0xFFFFFFFF; // 2³²-1 Overflow + + if (_remoteWindow > 0) { + _uploadLoop.activate(); + } + } + + void _handleDataMessage(Uint8List data, {int? type}) { + printDebug?.call('SSHChannel._handleDataMessage: len=${data.length}'); + + if (_remoteStream.isClosed) { + printDebug?.call('SSHChannel._handleDataMessage: remote already closed'); + return; + } + + _remoteStream.add(SSHChannelData(data, type: type)); + + _localWindow -= data.length; + if (_localWindow < 0) { + // Log and recover by issuing a window adjust; negative indicates protocol + // mismatch or delay in flow control from the remote. We prefer recovery + // to hard-closing for better interoperability. + printDebug?.call( + 'SSHChannel._handleDataMessage: local window underflow ' + '(localWindow=$_localWindow); recovering with WINDOW_ADJUST', + ); + } + + _sendWindowAdjustIfNeeded(); + } + + void _handleRequestMessage(SSH_Message_Channel_Request request) { + printDebug?.call('SSHChannel._handleRequest: ${request.requestType}'); + + final success = _requestHandler(request); + if (!request.wantReply) return; + success ? _sendRequestSuccess() : _sendRequestFailure(); + } + + void _handleRequestSuccessMessage() { + printDebug?.call('SSHChannel._handleRequestSuccessMessage'); + _requestReplyQueue.add(true); + } + + void _handleRequestFailureMessage() { + printDebug?.call('SSHChannel._handleRequestFailureMessage'); + _requestReplyQueue.add(false); + } + + void _handleEOFMessage() { + printDebug?.call('SSHChannel._handleEOFMessage'); + _remoteStream.close(); + } + + void _handleCloseMessage() { + printDebug?.call('SSHChannel._handleCLoseMessage'); + _remoteStream.close(); + close(); + } + + bool _defaultRequestHandler(SSH_Message_Channel_Request request) { + return false; + } + + void _sendEOFIfNeeded() { + printDebug?.call('SSHChannel._sendEOFIfNeeded'); + if (_done.isCompleted) return; + if (_hasSentEOF) return; + _hasSentEOF = true; + sendMessage(SSH_Message_Channel_EOF(recipientChannel: remoteId)); + } + + void _sendCloseIfNeeded() { + printDebug?.call('SSHChannel._sendCloseIfNeeded'); + if (_done.isCompleted) return; + if (_hasSentClose) return; + _hasSentClose = true; + + try { + sendMessage(SSH_Message_Channel_Close(recipientChannel: remoteId)); + } catch (e) { + printDebug?.call('SSHChannelController._sendCloseIfNeeded - error: $e'); + } + } + + void _sendRequestSuccess() { + printDebug?.call('SSHChannel._sendRequestSuccess'); + sendMessage(SSH_Message_Channel_Success(recipientChannel: remoteId)); + } + + void _sendRequestFailure() { + printDebug?.call('SSHChannel._sendRequestFailure'); + sendMessage(SSH_Message_Channel_Failure(recipientChannel: remoteId)); + } + + void _sendWindowAdjustIfNeeded() { + printDebug?.call('SSHChannel._sendWindowAdjustIfNeeded'); + + if (_done.isCompleted) return; + if (_remoteStream.isPaused) return; + + // Only send a window adjust message if the window is below the threshold. + // Assumes _windowAdjustThreshold is non-negative. + // If _localWindow is negative (an invalid state), it will likely be <= _windowAdjustThreshold. + if (_localWindow > _windowAdjustThreshold) return; + + const maxRfcWindowSize = 0xFFFFFFFF; // 2^32 - 1 + + // Determine the target window size, respecting RFC limits. + // localInitialWindowSize is typically a positive value (e.g., 2MB). + final int targetWindow = (localInitialWindowSize > maxRfcWindowSize || + localInitialWindowSize < 0) + ? maxRfcWindowSize + : localInitialWindowSize; + + // If the current local window is already at or above the target, + // no further increase is needed. + if (_localWindow >= targetWindow) { + // As a defensive measure, if _localWindow somehow exceeded the absolute max, + // cap it. This is more about correcting an invalid state than calculating + // bytesToAdd for this specific adjustment. + if (_localWindow > maxRfcWindowSize) { + _localWindow = maxRfcWindowSize; + } + return; + } + + // At this point, _localWindow < targetWindow. + // Calculate the number of bytes to add to reach the targetWindow. + // This will be a positive value. If _localWindow was negative (erroneous state), + // bytesToAdd will be appropriately larger to compensate. + final int bytesToAdd = targetWindow - _localWindow; + + // Update our local window state to reflect the window size we are now advertising. + _localWindow = targetWindow; + + sendMessage(SSH_Message_Channel_Window_Adjust( + recipientChannel: remoteId, + bytesToAdd: bytesToAdd, + )); + } + + late final _uploadLoop = OnceSimultaneously(() async { + while (true) { + if (_remoteWindow <= 0) { + return; + } + + final dataToRead = min(_remoteWindow, remoteMaximumPacketSize); + final data = await _localStreamConsumer.read(dataToRead); + + if (data == null) { + _sendEOFIfNeeded(); + + if (_remoteStream.isClosed) { + close(); + } + return; + } + + if (_hasSentEOF) { + return; + } + + printDebug?.call('SSHChannel._uploadLoop: len=${data.bytes.length}'); + + final message = data.isExtendedData + ? SSH_Message_Channel_Extended_Data( + recipientChannel: remoteId, + dataTypeCode: data.type!, + data: data.bytes, + ) + : SSH_Message_Channel_Data( + recipientChannel: remoteId, + data: data.bytes, + ); + + sendMessage(message); + + _remoteWindow -= data.bytes.length; + } + }); +} + +class SSHChannel { + /// The channel id on the local side. + SSHChannelId get channelId => _controller.localId; + + /// The channel id on the remote side. + SSHChannelId get remoteChannelId => _controller.localId; + + /// The maximum packet size that the remote side can receive. + int get maximumPacketSize => _controller.remoteMaximumPacketSize; + + /// A [Stream] of data received from the remote side. + Stream get stream => _controller._remoteStream.stream; + + /// A [StreamSink] that sends data to the remote side. Chucks must be + /// equal to or less than [maximumPacketSize]. + StreamSink get sink => _controller._localStream.sink; + + Future get done => _controller._done.future; + + SSHChannel(this._controller); + + final SSHChannelController _controller; + + /// Send data to the remote side. + void addData(Uint8List data, {int? type}) { + sink.add(SSHChannelData(data, type: type)); + } + + void setRequestHandler(SSHChannelRequestHandler handler) { + _controller._requestHandler = handler; + } + + Future sendExec(String command) async { + return await _controller.sendExec(command); + } + + Future sendShell() async { + return await _controller.sendShell(); + } + + Future sendX11Req({ + bool singleConnection = false, + String authenticationProtocol = 'MIT-MAGIC-COOKIE-1', + required String authenticationCookie, + int screenNumber = 0, + }) async { + return await _controller.sendX11Req( + singleConnection: singleConnection, + authenticationProtocol: authenticationProtocol, + authenticationCookie: authenticationCookie, + screenNumber: screenNumber, + ); + } + + void sendTerminalWindowChange({ + required int width, + required int height, + int pixelWidth = 0, + int pixelHeight = 0, + }) { + _controller.sendTerminalWindowChange( + width: width, + height: height, + pixelWidth: pixelWidth, + pixelHeight: pixelHeight, + ); + } + + void sendSignal(String signal) { + _controller.sendSignal(signal); + } + + /// Closes our side of the channel. Returns a [Future] that completes when + /// both sides of the channel are closed. + Future close() { + _controller._sendEOFIfNeeded(); + return _controller.close(); + } + + /// Destroys the channel in both directions. After calling this method, + /// no more data can be sent or received. + void destroy() => _controller.destroy(); + + @override + String toString() => 'SSHChannel($channelId:$remoteChannelId)'; +} + +class SSHChannelData { + /// Type of the data. Not null if the data is extended data. See: [SSHChannelExtendedDataType] + final int? type; + + final Uint8List bytes; + + bool get isExtendedData => type != null; + + SSHChannelData(this.bytes, {this.type}); +} + +class SSHChannelExtendedDataType { + static const stderr = 1; +} + +class SSHChannelDataSplitter + extends StreamTransformerBase { + SSHChannelDataSplitter(this.maxSize); + + final int maxSize; + + @override + Stream bind(Stream stream) async* { + await for (var chunk in stream) { + if (chunk.bytes.length < maxSize) { + yield chunk; + continue; + } + + final blocks = chunk.bytes.length ~/ maxSize; + + for (var i = 0; i < blocks; i++) { + yield SSHChannelData( + Uint8List.sublistView(chunk.bytes, i * maxSize, (i + 1) * maxSize), + type: chunk.type, + ); + } + + if (blocks * maxSize < chunk.bytes.length) { + yield SSHChannelData( + Uint8List.sublistView(chunk.bytes, blocks * maxSize), + type: chunk.type, + ); + } + } + } +} + +class SSHChannelDataConsumer extends StreamConsumerBase { + SSHChannelDataConsumer(super.stream); + + @override + int getLength(SSHChannelData chunk) { + return chunk.bytes.length; + } + + @override + SSHChannelData getSublistView(SSHChannelData chunk, int start, int end) { + return SSHChannelData( + Uint8List.sublistView(chunk.bytes, start, end), + type: chunk.type, + ); + } +} + +/// A function that can be invoked at most once simultaneously. +class OnceSimultaneously { + OnceSimultaneously(this._fn); + + final Future Function() _fn; + + var _isRunning = false; + + /// Call the function. If the function is already running, this is a no-op. + void activate() async { + if (_isRunning) return; + _isRunning = true; + try { + await _fn(); + } finally { + _isRunning = false; + } + } +} From ccda339608c26157212ba4f0bc9b64efcf48ba9e Mon Sep 17 00:00:00 2001 From: GT610 Date: Sat, 16 May 2026 13:58:21 +0800 Subject: [PATCH 2/3] Fix --- lib/src/sftp/sftp_client.dart | 33 ++++++++++++++++++++++----------- lib/src/ssh_channel.dart | 8 ++++++-- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/lib/src/sftp/sftp_client.dart b/lib/src/sftp/sftp_client.dart index 53ad054..c784e40 100644 --- a/lib/src/sftp/sftp_client.dart +++ b/lib/src/sftp/sftp_client.dart @@ -23,6 +23,9 @@ const _kReadChunkSize = 16 * 1024; const _kReadMaxPendingRequests = 64; const _kDownloadChunkSize = 64 * 1024; const _kDownloadMaxPendingRequests = 128; +// 16MB SFTP packet size limit (implementation DoS/safety limit, not protocol +// requirement; RFC 4253 / SFTP has no fixed maximum but 16MB is reasonable). +const _kMaxPacketSize = 16 * 1024 * 1024; class SftpClient { final SSHChannel _channel; @@ -37,13 +40,17 @@ class SftpClient { _handleData, onError: (Object e, _) { print('[SFTP] stream onError: $e'); - if (!_done.isCompleted) { + try { _done.completeError(e); + } on StateError { + // Ignore duplicate completion. } }, onDone: () { - if (!_done.isCompleted) { + try { _done.complete(); + } on StateError { + // Ignore duplicate completion. } }, ); @@ -255,7 +262,10 @@ class SftpClient { waiter.completeError(error, stackTrace); } _replyWaiters.clear(); - _done.completeError(error, stackTrace); + _buffer.clear(); + if (!_done.isCompleted) { + _done.completeError(error, stackTrace); + } } void _startHandshake() { @@ -461,7 +471,7 @@ class SftpClient { _buffer.add(data.bytes); _handlePackets(); } catch (e) { - print('[SFTP] _handleData ERROR: $e'); + _closeError(e); } } @@ -470,10 +480,13 @@ class SftpClient { while (_buffer.length >= lengthHeader) { try { final length = _buffer.byteData.getUint32(0); - // SFTP payload should not exceed a reasonable limit (16MB) - if (length > 16 * 1024 * 1024) { - print('[SFTP] _handlePackets suspicious length=$length bufferLen=${_buffer.length}'); - _buffer.clear(); + if (length > _kMaxPacketSize) { + _closeError( + SftpError( + 'Packet too large: length=$length ' + 'bufferLen=${_buffer.length}', + ), + ); return; } if (_buffer.length < lengthHeader + length) break; @@ -481,8 +494,7 @@ class SftpClient { final payload = Uint8List.sublistView(packet, lengthHeader); _handlePacket(payload); } catch (e) { - print('[SFTP] _handlePackets ERROR: $e'); - _buffer.clear(); + _closeError(e); return; } } @@ -506,7 +518,6 @@ class SftpClient { case SftpExtendedReplyPacket.packetType: return _handleExtendedReplyPacket(payload); default: - print('[SFTP] UNKNOWN packet type=$type'); printDebug?.call('SftpClient._handlePacket: unknown packet: $type'); } } diff --git a/lib/src/ssh_channel.dart b/lib/src/ssh_channel.dart index babe0f5..255e469 100644 --- a/lib/src/ssh_channel.dart +++ b/lib/src/ssh_channel.dart @@ -454,7 +454,7 @@ class SSHChannel { SSHChannelId get channelId => _controller.localId; /// The channel id on the remote side. - SSHChannelId get remoteChannelId => _controller.localId; + SSHChannelId get remoteChannelId => _controller.remoteId; /// The maximum packet size that the remote side can receive. int get maximumPacketSize => _controller.remoteMaximumPacketSize; @@ -553,7 +553,11 @@ class SSHChannelExtendedDataType { class SSHChannelDataSplitter extends StreamTransformerBase { - SSHChannelDataSplitter(this.maxSize); + SSHChannelDataSplitter(this.maxSize) { + if (maxSize <= 0) { + throw ArgumentError.value(maxSize, 'maxSize', 'must be positive'); + } + } final int maxSize; From 11b26fda4d68bee7d726191ea6c6af2d958dca09 Mon Sep 17 00:00:00 2001 From: GT610 Date: Sat, 16 May 2026 14:05:06 +0800 Subject: [PATCH 3/3] Fix --- lib/src/sftp/sftp_client.dart | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lib/src/sftp/sftp_client.dart b/lib/src/sftp/sftp_client.dart index c784e40..4fafcd4 100644 --- a/lib/src/sftp/sftp_client.dart +++ b/lib/src/sftp/sftp_client.dart @@ -40,6 +40,10 @@ class SftpClient { _handleData, onError: (Object e, _) { print('[SFTP] stream onError: $e'); + for (var waiter in _replyWaiters.values) { + waiter.completeError(e); + } + _replyWaiters.clear(); try { _done.completeError(e); } on StateError { @@ -47,6 +51,11 @@ class SftpClient { } }, onDone: () { + final error = SftpError('Stream closed'); + for (var waiter in _replyWaiters.values) { + waiter.completeError(error); + } + _replyWaiters.clear(); try { _done.complete(); } on StateError { @@ -263,6 +272,9 @@ class SftpClient { } _replyWaiters.clear(); _buffer.clear(); + if (!_handshake.isCompleted) { + _handshake.completeError(error, stackTrace); + } if (!_done.isCompleted) { _done.completeError(error, stackTrace); }