Files
ssh-manager/backend/src/main/java/com/sshmanager/controller/SftpController.java
liumangmang e2f600c264 Fix: 修复文件上传临时文件丢失问题
问题:
- Docker 环境下上传文件时出现 FileNotFoundException
- Tomcat 在异步任务执行前清理了临时文件 /tmp/tomcat.xxx/work/...

解决方案:
1. 配置 multipart.location 为持久化目录 ./data/upload-temp
2. 设置 file-size-threshold: 0 强制立即写入磁盘
3. 修改 SftpController.upload() 方法:
   - 在异步任务执行前将 MultipartFile 保存到持久化位置
   - 异步任务从保存的文件读取而非 MultipartFile.getInputStream()
   - 上传完成或失败后自动清理临时文件

影响范围:
- backend/src/main/resources/application.yml
- backend/src/main/java/com/sshmanager/controller/SftpController.java
2026-03-18 23:24:53 +08:00

1014 lines
41 KiB
Java

package com.sshmanager.controller;
import com.jcraft.jsch.SftpException;
import com.sshmanager.dto.SftpFileInfo;
import com.sshmanager.entity.Connection;
import com.sshmanager.entity.User;
import com.sshmanager.repository.UserRepository;
import com.sshmanager.service.ConnectionService;
import com.sshmanager.service.SftpService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@RestController
@RequestMapping("/api/sftp")
public class SftpController {
private static final Logger log = LoggerFactory.getLogger(SftpController.class);
private final ConnectionService connectionService;
private final UserRepository userRepository;
private final SftpService sftpService;
private final Map<String, SftpService.SftpSession> sessions = new ConcurrentHashMap<>();
private final Map<String, Object> sessionLocks = new ConcurrentHashMap<>();
private final Map<String, TransferTaskStatus> transferTasks = new ConcurrentHashMap<>();
private final Map<String, UploadTaskStatus> uploadTasks = new ConcurrentHashMap<>();
private final ExecutorService transferTaskExecutor = Executors.newCachedThreadPool();
private final Map<String, CopyOnWriteArrayList<SseEmitter>> taskEmitters = new ConcurrentHashMap<>();
public SftpController(ConnectionService connectionService,
UserRepository userRepository,
SftpService sftpService) {
this.connectionService = connectionService;
this.userRepository = userRepository;
this.sftpService = sftpService;
}
private Long getCurrentUserId(Authentication auth) {
User user = userRepository.findByUsername(auth.getName()).orElseThrow(() -> new IllegalStateException("User not found"));
return user.getId();
}
private String sessionKey(Long userId, Long connectionId) {
return userId + ":" + connectionId;
}
private String transferTaskKey(Long userId, String taskId) {
return userId + ":" + taskId;
}
private String uploadTaskKey(Long userId, String taskId) {
return userId + ":" + taskId;
}
private <T> T withSessionLock(String key, Supplier<T> action) {
Object lock = sessionLocks.computeIfAbsent(key, k -> new Object());
synchronized (lock) {
return action.get();
}
}
private <T> T withTwoSessionLocks(String keyA, String keyB, Supplier<T> action) {
if (keyA.equals(keyB)) {
return withSessionLock(keyA, action);
}
String first = keyA.compareTo(keyB) < 0 ? keyA : keyB;
String second = keyA.compareTo(keyB) < 0 ? keyB : keyA;
Object firstLock = sessionLocks.computeIfAbsent(first, k -> new Object());
Object secondLock = sessionLocks.computeIfAbsent(second, k -> new Object());
synchronized (firstLock) {
synchronized (secondLock) {
return action.get();
}
}
}
private SftpService.SftpSession getOrCreateSession(Long connectionId, Long userId) throws Exception {
String key = sessionKey(userId, connectionId);
SftpService.SftpSession session = sessions.get(key);
if (session == null || !session.isConnected()) {
Connection conn = connectionService.getConnectionForSsh(connectionId, userId);
String password = connectionService.getDecryptedPassword(conn);
String privateKey = connectionService.getDecryptedPrivateKey(conn);
String passphrase = connectionService.getDecryptedPassphrase(conn);
session = sftpService.connect(conn, password, privateKey, passphrase);
sessions.put(key, session);
}
cleanupTask.recordAccess(key);
return session;
}
@GetMapping("/list")
public ResponseEntity<?> list(
@RequestParam Long connectionId,
@RequestParam(required = false, defaultValue = ".") String path,
Authentication authentication) {
try {
Long userId = getCurrentUserId(authentication);
String key = sessionKey(userId, connectionId);
return withSessionLock(key, () -> {
try {
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
List<SftpService.FileInfo> files = sftpService.listFiles(session, path);
List<SftpFileInfo> dtos = files.stream()
.map(f -> new SftpFileInfo(f.name, f.directory, f.size, f.mtime))
.collect(Collectors.toList());
return ResponseEntity.ok(dtos);
} catch (Exception e) {
SftpService.SftpSession existing = sessions.remove(key);
if (existing != null) {
existing.disconnect();
}
throw new RuntimeException(e);
}
});
} catch (Exception e) {
String errorMsg = toSftpErrorMessage(e, path, "list");
log.warn("SFTP list failed: connectionId={}, path={}, error={}", connectionId, path, errorMsg, e);
Map<String, String> err = new HashMap<>();
err.put("error", errorMsg);
return ResponseEntity.status(500).body(err);
}
}
private String toSftpErrorMessage(Exception e, String path, String operation) {
if (e.getMessage() != null && !e.getMessage().trim().isEmpty()) {
return e.getMessage();
}
Throwable cur = e;
for (int i = 0; i < 10 && cur != null; i++) {
if (cur instanceof SftpException) {
return SftpService.formatSftpExceptionMessage((SftpException) cur, path, operation);
}
if (cur.getMessage() != null && !cur.getMessage().trim().isEmpty()) {
return cur.getMessage();
}
cur = cur.getCause();
}
return operation + " failed";
}
private ResponseEntity<Map<String, String>> validateTransferPaths(String sourcePath, String targetPath) {
if (sourcePath == null || sourcePath.trim().isEmpty()) {
Map<String, String> err = new HashMap<>();
err.put("error", "sourcePath is required");
return ResponseEntity.badRequest().body(err);
}
if (targetPath == null || targetPath.trim().isEmpty()) {
Map<String, String> err = new HashMap<>();
err.put("error", "targetPath is required");
return ResponseEntity.badRequest().body(err);
}
return null;
}
private void executeTransfer(Long userId,
Long sourceConnectionId,
String sourcePath,
Long targetConnectionId,
String targetPath,
TransferTaskStatus status) throws Exception {
String cleanSourcePath = sourcePath.trim();
String cleanTargetPath = targetPath.trim();
String sourceKey = sessionKey(userId, sourceConnectionId);
String targetKey = sessionKey(userId, targetConnectionId);
withTwoSessionLocks(sourceKey, targetKey, () -> {
try {
SftpService.SftpSession sourceSession = getOrCreateSession(sourceConnectionId, userId);
SftpService.SftpSession targetSession = getOrCreateSession(targetConnectionId, userId);
sftpService.transferRemote(sourceSession, cleanSourcePath, targetSession, cleanTargetPath,
new SftpService.TransferProgressListener() {
@Override
public void onStart(long totalBytes) {
status.setProgress(0, totalBytes);
}
@Override
public void onProgress(long transferredBytes, long totalBytes) {
status.setProgress(transferredBytes, totalBytes);
}
});
return null;
} catch (Exception e) {
SftpService.SftpSession source = sessions.remove(sourceKey);
if (source != null) {
source.disconnect();
}
if (!sourceKey.equals(targetKey)) {
SftpService.SftpSession target = sessions.remove(targetKey);
if (target != null) {
target.disconnect();
}
}
throw new RuntimeException(e);
}
});
}
@GetMapping("/pwd")
public ResponseEntity<Map<String, String>> pwd(
@RequestParam Long connectionId,
Authentication authentication) {
try {
Long userId = getCurrentUserId(authentication);
String key = sessionKey(userId, connectionId);
return withSessionLock(key, () -> {
try {
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
String pwd = sftpService.pwd(session);
Map<String, String> result = new HashMap<>();
result.put("path", pwd);
return ResponseEntity.ok(result);
} catch (Exception e) {
SftpService.SftpSession existing = sessions.remove(key);
if (existing != null) {
existing.disconnect();
}
throw new RuntimeException(e);
}
});
} catch (Exception e) {
log.warn("SFTP pwd failed: connectionId={}", connectionId, e);
Map<String, String> err = new HashMap<>();
err.put("error", e.getMessage() != null ? e.getMessage() : "pwd failed");
return ResponseEntity.status(500).body(err);
}
}
@GetMapping("/download")
public ResponseEntity<StreamingResponseBody> download(
@RequestParam Long connectionId,
@RequestParam String path,
Authentication authentication) {
try {
Long userId = getCurrentUserId(authentication);
String key = sessionKey(userId, connectionId);
String filename = path.contains("/") ? path.substring(path.lastIndexOf('/') + 1) : path;
StreamingResponseBody stream = outputStream -> withSessionLock(key, () -> {
try {
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
sftpService.download(session, path, outputStream);
outputStream.flush();
return null;
} catch (Exception e) {
SftpService.SftpSession existing = sessions.remove(key);
if (existing != null) {
existing.disconnect();
}
throw new RuntimeException(e);
}
});
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(stream);
} catch (Exception e) {
return ResponseEntity.status(500).build();
}
}
@PostMapping("/upload")
public ResponseEntity<Map<String, Object>> upload(
@RequestParam Long connectionId,
@RequestParam String path,
@RequestParam("file") MultipartFile file,
Authentication authentication) {
java.io.File tempFile = null;
try {
Long userId = getCurrentUserId(authentication);
String taskId = UUID.randomUUID().toString();
String taskKey = uploadTaskKey(userId, taskId);
// Save file to persistent location before async processing
java.io.File uploadTempDir = new java.io.File("./data/upload-temp");
if (!uploadTempDir.exists()) {
uploadTempDir.mkdirs();
}
tempFile = new java.io.File(uploadTempDir, taskId + "_" + file.getOriginalFilename());
file.transferTo(tempFile);
final java.io.File savedFile = tempFile;
UploadTaskStatus status = new UploadTaskStatus(taskId, userId, connectionId,
path, file.getOriginalFilename(), file.getSize());
status.setController(this);
uploadTasks.put(taskKey, status);
Future<?> future = transferTaskExecutor.submit(() -> {
status.setStatus("running");
String key = sessionKey(userId, connectionId);
try {
withSessionLock(key, () -> {
try {
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
String remotePath = (path == null || path.isEmpty() || path.equals("/"))
? "/" + savedFile.getName().substring(savedFile.getName().indexOf("_") + 1)
: (path.endsWith("/") ? path + savedFile.getName().substring(savedFile.getName().indexOf("_") + 1) : path + "/" + savedFile.getName().substring(savedFile.getName().indexOf("_") + 1));
AtomicLong transferred = new AtomicLong(0);
try (java.io.InputStream in = new java.io.FileInputStream(savedFile)) {
sftpService.upload(session, remotePath, in, new SftpService.TransferProgressListener() {
@Override
public void onStart(long totalBytes) {
status.setProgress(0, totalBytes);
}
@Override
public void onProgress(long count, long totalBytes) {
long current = transferred.addAndGet(count);
status.setProgress(current, status.getTotalBytes());
}
});
}
status.markSuccess();
return null;
} catch (Exception e) {
SftpService.SftpSession existing = sessions.remove(key);
if (existing != null) {
existing.disconnect();
}
throw new RuntimeException(e);
} finally {
// Clean up temp file after upload completes
if (savedFile.exists()) {
savedFile.delete();
}
}
});
} catch (Exception e) {
status.markError(e.getMessage() != null ? e.getMessage() : "Upload failed");
// Clean up temp file on error
if (savedFile.exists()) {
savedFile.delete();
}
}
});
status.setFuture(future);
Map<String, Object> result = new HashMap<>();
result.put("taskId", taskId);
result.put("message", "Upload started");
return ResponseEntity.ok(result);
} catch (Exception e) {
// Clean up temp file if initial save failed
if (tempFile != null && tempFile.exists()) {
tempFile.delete();
}
Map<String, Object> error = new HashMap<>();
error.put("error", e.getMessage());
return ResponseEntity.status(500).body(error);
}
}
@DeleteMapping("/delete")
public ResponseEntity<Map<String, String>> delete(
@RequestParam Long connectionId,
@RequestParam String path,
@RequestParam boolean directory,
Authentication authentication) {
try {
Long userId = getCurrentUserId(authentication);
String key = sessionKey(userId, connectionId);
return withSessionLock(key, () -> {
try {
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
sftpService.delete(session, path, directory);
Map<String, String> result = new HashMap<>();
result.put("message", "Deleted");
return ResponseEntity.ok(result);
} catch (Exception e) {
SftpService.SftpSession existing = sessions.remove(key);
if (existing != null) {
existing.disconnect();
}
throw new RuntimeException(e);
}
});
} catch (Exception e) {
Map<String, String> error = new HashMap<>();
error.put("error", e.getMessage());
return ResponseEntity.status(500).body(error);
}
}
@PostMapping("/mkdir")
public ResponseEntity<Map<String, String>> mkdir(
@RequestParam Long connectionId,
@RequestParam String path,
Authentication authentication) {
try {
Long userId = getCurrentUserId(authentication);
String key = sessionKey(userId, connectionId);
return withSessionLock(key, () -> {
try {
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
sftpService.mkdir(session, path);
Map<String, String> result = new HashMap<>();
result.put("message", "Created");
return ResponseEntity.ok(result);
} catch (Exception e) {
SftpService.SftpSession existing = sessions.remove(key);
if (existing != null) {
existing.disconnect();
}
throw new RuntimeException(e);
}
});
} catch (Exception e) {
Map<String, String> error = new HashMap<>();
error.put("error", e.getMessage());
return ResponseEntity.status(500).body(error);
}
}
@PostMapping("/rename")
public ResponseEntity<Map<String, String>> rename(
@RequestParam Long connectionId,
@RequestParam String oldPath,
@RequestParam String newPath,
Authentication authentication) {
try {
Long userId = getCurrentUserId(authentication);
String key = sessionKey(userId, connectionId);
return withSessionLock(key, () -> {
try {
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
sftpService.rename(session, oldPath, newPath);
Map<String, String> result = new HashMap<>();
result.put("message", "Renamed");
return ResponseEntity.ok(result);
} catch (Exception e) {
SftpService.SftpSession existing = sessions.remove(key);
if (existing != null) {
existing.disconnect();
}
throw new RuntimeException(e);
}
});
} catch (Exception e) {
Map<String, String> error = new HashMap<>();
error.put("error", e.getMessage());
return ResponseEntity.status(500).body(error);
}
}
@PostMapping("/transfer-remote")
public ResponseEntity<Map<String, String>> transferRemote(
@RequestParam Long sourceConnectionId,
@RequestParam String sourcePath,
@RequestParam Long targetConnectionId,
@RequestParam String targetPath,
Authentication authentication) {
try {
Long userId = getCurrentUserId(authentication);
ResponseEntity<Map<String, String>> validation = validateTransferPaths(sourcePath, targetPath);
if (validation != null) {
return validation;
}
TransferTaskStatus status = new TransferTaskStatus(UUID.randomUUID().toString(), userId, sourceConnectionId, targetConnectionId,
sourcePath.trim(), targetPath.trim());
executeTransfer(userId, sourceConnectionId, sourcePath, targetConnectionId, targetPath, status);
Map<String, String> result = new HashMap<>();
result.put("message", "Transferred");
return ResponseEntity.ok(result);
} catch (Exception e) {
Map<String, String> error = new HashMap<>();
error.put("error", e.getMessage() != null ? e.getMessage() : "Transfer failed");
return ResponseEntity.status(500).body(error);
}
}
@PostMapping("/transfer-remote/tasks")
public ResponseEntity<Map<String, Object>> createTransferRemoteTask(
@RequestParam Long sourceConnectionId,
@RequestParam String sourcePath,
@RequestParam Long targetConnectionId,
@RequestParam String targetPath,
Authentication authentication) {
Long userId = getCurrentUserId(authentication);
ResponseEntity<Map<String, String>> validation = validateTransferPaths(sourcePath, targetPath);
if (validation != null) {
Map<String, Object> err = new HashMap<>();
err.putAll(validation.getBody());
return ResponseEntity.status(validation.getStatusCode()).body(err);
}
TransferTaskStatus status = new TransferTaskStatus(UUID.randomUUID().toString(), userId, sourceConnectionId, targetConnectionId,
sourcePath.trim(), targetPath.trim());
status.setController(this);
String taskKey = transferTaskKey(userId, status.getTaskId());
transferTasks.put(taskKey, status);
Future<?> future = transferTaskExecutor.submit(() -> {
status.setStatus("running");
try {
if (Thread.currentThread().isInterrupted()) {
status.markCancelled();
return;
}
executeTransfer(userId, sourceConnectionId, sourcePath, targetConnectionId, targetPath, status);
status.markSuccess();
} catch (Exception e) {
if (e instanceof InterruptedException || Thread.currentThread().isInterrupted()) {
status.markCancelled();
return;
}
status.markError(toSftpErrorMessage(e, sourcePath, "transfer"));
log.warn("SFTP transfer task failed: taskId={}, sourceConnectionId={}, sourcePath={}, targetConnectionId={}, targetPath={}, error={}",
status.getTaskId(), sourceConnectionId, sourcePath, targetConnectionId, targetPath, e.getMessage(), e);
}
});
status.setFuture(future);
return ResponseEntity.ok(status.toResponse());
}
@GetMapping("/transfer-remote/tasks/{taskId}")
public ResponseEntity<Map<String, Object>> getTransferRemoteTask(
@PathVariable String taskId,
Authentication authentication) {
Long userId = getCurrentUserId(authentication);
TransferTaskStatus status = transferTasks.get(transferTaskKey(userId, taskId));
if (status == null) {
Map<String, Object> error = new HashMap<>();
error.put("error", "Transfer task not found");
return ResponseEntity.status(404).body(error);
}
return ResponseEntity.ok(status.toResponse());
}
@GetMapping("/upload/tasks/{taskId}")
public ResponseEntity<Map<String, Object>> getUploadTask(
@PathVariable String taskId,
Authentication authentication) {
Long userId = getCurrentUserId(authentication);
UploadTaskStatus status = uploadTasks.get(uploadTaskKey(userId, taskId));
if (status == null) {
Map<String, Object> error = new HashMap<>();
error.put("error", "Upload task not found");
return ResponseEntity.status(404).body(error);
}
return ResponseEntity.ok(status.toResponse());
}
@DeleteMapping("/transfer-remote/tasks/{taskId}")
public ResponseEntity<Map<String, Object>> cancelTransferRemoteTask(
@PathVariable String taskId,
Authentication authentication) {
Long userId = getCurrentUserId(authentication);
TransferTaskStatus status = transferTasks.get(transferTaskKey(userId, taskId));
if (status == null) {
Map<String, Object> error = new HashMap<>();
error.put("error", "Transfer task not found");
return ResponseEntity.status(404).body(error);
}
boolean cancelled = status.cancel();
Map<String, Object> result = status.toResponse();
result.put("cancelRequested", cancelled);
if (!cancelled) {
result.put("message", "Task already running or finished; current transfer cannot be interrupted safely");
}
return ResponseEntity.ok(result);
}
@GetMapping("/transfer-remote/tasks/{taskId}/progress")
public SseEmitter streamTransferProgress(
@PathVariable String taskId,
Authentication authentication) {
Long userId = getCurrentUserId(authentication);
String taskKey = transferTaskKey(userId, taskId);
TransferTaskStatus status = transferTasks.get(taskKey);
SseEmitter emitter = new SseEmitter(300000L); // 5 minutes timeout
if (status == null) {
try {
Map<String, String> error = new HashMap<>();
error.put("error", "Task not found");
emitter.send(SseEmitter.event().name("error").data(error));
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(e);
}
return emitter;
}
taskEmitters.computeIfAbsent(taskKey, k -> new CopyOnWriteArrayList<>()).add(emitter);
emitter.onCompletion(() -> removeEmitter(taskKey, emitter));
emitter.onTimeout(() -> removeEmitter(taskKey, emitter));
emitter.onError((e) -> removeEmitter(taskKey, emitter));
// Send initial status
try {
emitter.send(SseEmitter.event().name("progress").data(status.toResponse()));
} catch (IOException e) {
removeEmitter(taskKey, emitter);
}
return emitter;
}
@GetMapping("/upload/tasks/{taskId}/progress")
public SseEmitter streamUploadProgress(
@PathVariable String taskId,
Authentication authentication) {
Long userId = getCurrentUserId(authentication);
String taskKey = uploadTaskKey(userId, taskId);
UploadTaskStatus status = uploadTasks.get(taskKey);
SseEmitter emitter = new SseEmitter(300000L); // 5 minutes timeout
if (status == null) {
try {
Map<String, String> error = new HashMap<>();
error.put("error", "Task not found");
emitter.send(SseEmitter.event().name("error").data(error));
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(e);
}
return emitter;
}
taskEmitters.computeIfAbsent(taskKey, k -> new CopyOnWriteArrayList<>()).add(emitter);
emitter.onCompletion(() -> removeEmitter(taskKey, emitter));
emitter.onTimeout(() -> removeEmitter(taskKey, emitter));
emitter.onError((e) -> removeEmitter(taskKey, emitter));
// Send initial status
try {
emitter.send(SseEmitter.event().name("progress").data(status.toResponse()));
} catch (IOException e) {
removeEmitter(taskKey, emitter);
}
return emitter;
}
private void removeEmitter(String taskKey, SseEmitter emitter) {
CopyOnWriteArrayList<SseEmitter> emitters = taskEmitters.get(taskKey);
if (emitters != null) {
emitters.remove(emitter);
if (emitters.isEmpty()) {
taskEmitters.remove(taskKey);
}
}
}
private void broadcastProgress(String taskKey, Map<String, Object> data) {
CopyOnWriteArrayList<SseEmitter> emitters = taskEmitters.get(taskKey);
if (emitters == null || emitters.isEmpty()) {
return;
}
List<SseEmitter> deadEmitters = new java.util.ArrayList<>();
for (SseEmitter emitter : emitters) {
try {
emitter.send(SseEmitter.event().name("progress").data(data));
} catch (Exception e) {
deadEmitters.add(emitter);
}
}
for (SseEmitter dead : deadEmitters) {
removeEmitter(taskKey, dead);
}
}
@PostMapping("/disconnect")
public ResponseEntity<Map<String, String>> disconnect(
@RequestParam Long connectionId,
Authentication authentication) {
Long userId = getCurrentUserId(authentication);
String key = sessionKey(userId, connectionId);
SftpService.SftpSession session = sessions.remove(key);
if (session != null) {
session.disconnect();
}
sessionLocks.remove(key);
cleanupTask.removeSession(key);
Map<String, String> result = new HashMap<>();
result.put("message", "Disconnected");
return ResponseEntity.ok(result);
}
public void cleanupExpiredSessions(int timeoutMinutes) {
List<String> expired = cleanupTask.getExpiredSessions(timeoutMinutes);
for (String key : expired) {
SftpService.SftpSession session = sessions.remove(key);
if (session != null) {
session.disconnect();
}
sessionLocks.remove(key);
cleanupTask.removeSession(key);
log.info("Cleaned up expired SFTP session: {}", key);
}
}
public void cleanupExpiredTransferTasks(int timeoutMinutes) {
long now = System.currentTimeMillis();
long timeoutMillis = timeoutMinutes * 60L * 1000L;
transferTasks.entrySet().removeIf(entry -> entry.getValue().isExpired(now, timeoutMillis));
}
private final SftpSessionExpiryCleanup cleanupTask = new SftpSessionExpiryCleanup();
public static class SftpSessionExpiryCleanup {
private final Map<String, Long> lastAccessTime = new ConcurrentHashMap<>();
public void recordAccess(String key) {
lastAccessTime.put(key, System.currentTimeMillis());
}
public void removeSession(String key) {
lastAccessTime.remove(key);
}
public List<String> getExpiredSessions(long timeoutMinutes) {
long now = System.currentTimeMillis();
long timeoutMillis = timeoutMinutes * 60 * 1000;
return lastAccessTime.entrySet().stream()
.filter(entry -> now - entry.getValue() > timeoutMillis)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
}
public static class TransferTaskStatus {
private final String taskId;
private final Long userId;
private final Long sourceConnectionId;
private final Long targetConnectionId;
private final String sourcePath;
private final String targetPath;
private final long createdAt;
private volatile String status;
private volatile String error;
private volatile long startedAt;
private volatile long finishedAt;
private final AtomicLong totalBytes;
private final AtomicLong transferredBytes;
private volatile Future<?> future;
private volatile SftpController controller;
public TransferTaskStatus(String taskId,
Long userId,
Long sourceConnectionId,
Long targetConnectionId,
String sourcePath,
String targetPath) {
this.taskId = taskId;
this.userId = userId;
this.sourceConnectionId = sourceConnectionId;
this.targetConnectionId = targetConnectionId;
this.sourcePath = sourcePath;
this.targetPath = targetPath;
this.createdAt = System.currentTimeMillis();
this.status = "queued";
this.totalBytes = new AtomicLong(0);
this.transferredBytes = new AtomicLong(0);
}
public String getTaskId() {
return taskId;
}
public void setController(SftpController controller) {
this.controller = controller;
}
public void setFuture(Future<?> future) {
this.future = future;
}
public void setStatus(String status) {
this.status = status;
if ("running".equals(status) && startedAt == 0) {
startedAt = System.currentTimeMillis();
}
notifyProgress();
}
public void setProgress(long transferred, long total) {
if (startedAt == 0) {
startedAt = System.currentTimeMillis();
}
transferredBytes.set(Math.max(0, transferred));
totalBytes.set(Math.max(0, total));
notifyProgress();
}
public void markSuccess() {
long total = totalBytes.get();
if (total > 0) {
transferredBytes.set(total);
}
status = "success";
finishedAt = System.currentTimeMillis();
notifyProgress();
}
public void markError(String message) {
status = "error";
error = message;
finishedAt = System.currentTimeMillis();
notifyProgress();
}
public void markCancelled() {
status = "cancelled";
finishedAt = System.currentTimeMillis();
notifyProgress();
}
private void notifyProgress() {
if (controller != null) {
String taskKey = controller.transferTaskKey(userId, taskId);
controller.broadcastProgress(taskKey, toResponse());
}
}
public boolean cancel() {
if (!"queued".equals(status) && !"running".equals(status)) {
return false;
}
Future<?> currentFuture = future;
if (currentFuture != null) {
currentFuture.cancel(true);
}
markCancelled();
return true;
}
public Map<String, Object> toResponse() {
long total = totalBytes.get();
long transferred = transferredBytes.get();
int progress = total > 0 ? (int) Math.min(100, Math.round((transferred * 100.0) / total)) :
(("success".equals(status) || "error".equals(status) || "cancelled".equals(status)) ? 100 : 0);
Map<String, Object> result = new HashMap<>();
result.put("taskId", taskId);
result.put("userId", userId);
result.put("sourceConnectionId", sourceConnectionId);
result.put("targetConnectionId", targetConnectionId);
result.put("sourcePath", sourcePath);
result.put("targetPath", targetPath);
result.put("status", status);
result.put("progress", progress);
result.put("transferredBytes", transferred);
result.put("totalBytes", total);
result.put("createdAt", createdAt);
result.put("startedAt", startedAt);
result.put("finishedAt", finishedAt);
if (error != null && !error.isEmpty()) {
result.put("error", error);
}
return result;
}
public boolean isExpired(long now, long timeoutMillis) {
if ("queued".equals(status) || "running".equals(status)) {
return false;
}
long endTime = finishedAt > 0 ? finishedAt : createdAt;
return now - endTime > timeoutMillis;
}
}
public static class UploadTaskStatus {
private final String taskId;
private final Long userId;
private final Long connectionId;
private final String path;
private final String filename;
private final long fileSize;
private final long createdAt;
private volatile String status;
private volatile String error;
private volatile long startedAt;
private volatile long finishedAt;
private final AtomicLong totalBytes;
private final AtomicLong transferredBytes;
private volatile Future<?> future;
private volatile SftpController controller;
public UploadTaskStatus(String taskId, Long userId, Long connectionId,
String path, String filename, long fileSize) {
this.taskId = taskId;
this.userId = userId;
this.connectionId = connectionId;
this.path = path;
this.filename = filename;
this.fileSize = fileSize;
this.createdAt = System.currentTimeMillis();
this.status = "queued";
this.totalBytes = new AtomicLong(fileSize);
this.transferredBytes = new AtomicLong(0);
}
public long getTotalBytes() {
return totalBytes.get();
}
public void setController(SftpController controller) {
this.controller = controller;
}
public void setFuture(Future<?> future) {
this.future = future;
}
public void setStatus(String status) {
this.status = status;
if ("running".equals(status) && startedAt == 0) {
startedAt = System.currentTimeMillis();
}
notifyProgress();
}
public void setProgress(long transferred, long total) {
if (startedAt == 0) {
startedAt = System.currentTimeMillis();
}
transferredBytes.set(Math.max(0, transferred));
if (total > 0) {
totalBytes.set(Math.max(0, total));
}
notifyProgress();
}
public void markSuccess() {
long total = totalBytes.get();
if (total > 0) {
transferredBytes.set(total);
}
status = "success";
finishedAt = System.currentTimeMillis();
notifyProgress();
}
public void markError(String message) {
status = "error";
error = message;
finishedAt = System.currentTimeMillis();
notifyProgress();
}
private void notifyProgress() {
if (controller != null) {
String taskKey = controller.uploadTaskKey(userId, taskId);
controller.broadcastProgress(taskKey, toResponse());
}
}
public Map<String, Object> toResponse() {
long total = totalBytes.get();
long transferred = transferredBytes.get();
int progress = total > 0 ? (int) Math.min(100, Math.round((transferred * 100.0) / total)) :
(("success".equals(status) || "error".equals(status)) ? 100 : 0);
Map<String, Object> result = new HashMap<>();
result.put("taskId", taskId);
result.put("status", status);
result.put("progress", progress);
result.put("transferredBytes", transferred);
result.put("totalBytes", total);
result.put("filename", filename);
result.put("createdAt", createdAt);
result.put("startedAt", startedAt);
result.put("finishedAt", finishedAt);
if (error != null && !error.isEmpty()) {
result.put("error", error);
}
return result;
}
public boolean isExpired(long now, long timeoutMillis) {
if ("queued".equals(status) || "running".equals(status)) {
return false;
}
long endTime = finishedAt > 0 ? finishedAt : createdAt;
return now - endTime > timeoutMillis;
}
}
}