feat: 增强 Transfers 页面文件浏览功能
- 在 SftpFilePickerModal 中添加搜索功能 - 添加显示/隐藏文件切换按钮(参考 SftpView) - Remote->Many 模式下目标连接列表自动排除源连接 - 全选功能自动排除源连接 - 添加空状态提示信息 - 优化用户体验和交互逻辑
This commit is contained in:
@@ -15,6 +15,9 @@ public class SftpSessionCleanupTask {
|
||||
@Value("${sshmanager.sftp-session-timeout-minutes:30}")
|
||||
private int sessionTimeoutMinutes;
|
||||
|
||||
@Value("${sshmanager.transfer-task-timeout-minutes:30}")
|
||||
private int transferTaskTimeoutMinutes;
|
||||
|
||||
private final SftpController sftpController;
|
||||
|
||||
public SftpSessionCleanupTask(SftpController sftpController) {
|
||||
@@ -25,5 +28,6 @@ public class SftpSessionCleanupTask {
|
||||
public void cleanupIdleSessions() {
|
||||
log.debug("Running SFTP session cleanup task");
|
||||
sftpController.cleanupExpiredSessions(sessionTimeoutMinutes);
|
||||
sftpController.cleanupExpiredTransferTasks(transferTaskTimeoutMinutes);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,11 +17,19 @@ import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
@@ -38,6 +46,10 @@ public class SftpController {
|
||||
|
||||
private final Map<String, SftpService.SftpSession> sessions = new ConcurrentHashMap<>();
|
||||
private final Map<String, Object> sessionLocks = new ConcurrentHashMap<>();
|
||||
private final Map<String, TransferTaskStatus> transferTasks = new ConcurrentHashMap<>();
|
||||
private final Map<String, UploadTaskStatus> uploadTasks = new ConcurrentHashMap<>();
|
||||
private final ExecutorService transferTaskExecutor = Executors.newCachedThreadPool();
|
||||
private final Map<String, CopyOnWriteArrayList<SseEmitter>> taskEmitters = new ConcurrentHashMap<>();
|
||||
|
||||
public SftpController(ConnectionService connectionService,
|
||||
UserRepository userRepository,
|
||||
@@ -56,6 +68,14 @@ public class SftpController {
|
||||
return userId + ":" + connectionId;
|
||||
}
|
||||
|
||||
private String transferTaskKey(Long userId, String taskId) {
|
||||
return userId + ":" + taskId;
|
||||
}
|
||||
|
||||
private String uploadTaskKey(Long userId, String taskId) {
|
||||
return userId + ":" + taskId;
|
||||
}
|
||||
|
||||
private <T> T withSessionLock(String key, Supplier<T> action) {
|
||||
Object lock = sessionLocks.computeIfAbsent(key, k -> new Object());
|
||||
synchronized (lock) {
|
||||
@@ -143,6 +163,64 @@ public class SftpController {
|
||||
return operation + " failed";
|
||||
}
|
||||
|
||||
private ResponseEntity<Map<String, String>> validateTransferPaths(String sourcePath, String targetPath) {
|
||||
if (sourcePath == null || sourcePath.trim().isEmpty()) {
|
||||
Map<String, String> err = new HashMap<>();
|
||||
err.put("error", "sourcePath is required");
|
||||
return ResponseEntity.badRequest().body(err);
|
||||
}
|
||||
if (targetPath == null || targetPath.trim().isEmpty()) {
|
||||
Map<String, String> err = new HashMap<>();
|
||||
err.put("error", "targetPath is required");
|
||||
return ResponseEntity.badRequest().body(err);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void executeTransfer(Long userId,
|
||||
Long sourceConnectionId,
|
||||
String sourcePath,
|
||||
Long targetConnectionId,
|
||||
String targetPath,
|
||||
TransferTaskStatus status) throws Exception {
|
||||
String cleanSourcePath = sourcePath.trim();
|
||||
String cleanTargetPath = targetPath.trim();
|
||||
String sourceKey = sessionKey(userId, sourceConnectionId);
|
||||
String targetKey = sessionKey(userId, targetConnectionId);
|
||||
|
||||
withTwoSessionLocks(sourceKey, targetKey, () -> {
|
||||
try {
|
||||
SftpService.SftpSession sourceSession = getOrCreateSession(sourceConnectionId, userId);
|
||||
SftpService.SftpSession targetSession = getOrCreateSession(targetConnectionId, userId);
|
||||
sftpService.transferRemote(sourceSession, cleanSourcePath, targetSession, cleanTargetPath,
|
||||
new SftpService.TransferProgressListener() {
|
||||
@Override
|
||||
public void onStart(long totalBytes) {
|
||||
status.setProgress(0, totalBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProgress(long transferredBytes, long totalBytes) {
|
||||
status.setProgress(transferredBytes, totalBytes);
|
||||
}
|
||||
});
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
SftpService.SftpSession source = sessions.remove(sourceKey);
|
||||
if (source != null) {
|
||||
source.disconnect();
|
||||
}
|
||||
if (!sourceKey.equals(targetKey)) {
|
||||
SftpService.SftpSession target = sessions.remove(targetKey);
|
||||
if (target != null) {
|
||||
target.disconnect();
|
||||
}
|
||||
}
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@GetMapping("/pwd")
|
||||
public ResponseEntity<Map<String, String>> pwd(
|
||||
@RequestParam Long connectionId,
|
||||
@@ -207,36 +285,69 @@ public class SftpController {
|
||||
}
|
||||
|
||||
@PostMapping("/upload")
|
||||
public ResponseEntity<Map<String, String>> upload(
|
||||
public ResponseEntity<Map<String, Object>> upload(
|
||||
@RequestParam Long connectionId,
|
||||
@RequestParam String path,
|
||||
@RequestParam("file") MultipartFile file,
|
||||
Authentication authentication) {
|
||||
try {
|
||||
Long userId = getCurrentUserId(authentication);
|
||||
String key = sessionKey(userId, connectionId);
|
||||
return withSessionLock(key, () -> {
|
||||
String taskId = UUID.randomUUID().toString();
|
||||
String taskKey = uploadTaskKey(userId, taskId);
|
||||
|
||||
UploadTaskStatus status = new UploadTaskStatus(taskId, userId, connectionId,
|
||||
path, file.getOriginalFilename(), file.getSize());
|
||||
status.setController(this);
|
||||
uploadTasks.put(taskKey, status);
|
||||
|
||||
Future<?> future = transferTaskExecutor.submit(() -> {
|
||||
status.setStatus("running");
|
||||
String key = sessionKey(userId, connectionId);
|
||||
try {
|
||||
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
||||
String remotePath = (path == null || path.isEmpty() || path.equals("/"))
|
||||
? "/" + file.getOriginalFilename()
|
||||
: (path.endsWith("/") ? path + file.getOriginalFilename() : path + "/" + file.getOriginalFilename());
|
||||
try (java.io.InputStream in = file.getInputStream()) {
|
||||
sftpService.upload(session, remotePath, in);
|
||||
}
|
||||
Map<String, String> result = new HashMap<>();
|
||||
result.put("message", "Uploaded");
|
||||
return ResponseEntity.ok(result);
|
||||
withSessionLock(key, () -> {
|
||||
try {
|
||||
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
||||
String remotePath = (path == null || path.isEmpty() || path.equals("/"))
|
||||
? "/" + file.getOriginalFilename()
|
||||
: (path.endsWith("/") ? path + file.getOriginalFilename() : path + "/" + file.getOriginalFilename());
|
||||
|
||||
AtomicLong transferred = new AtomicLong(0);
|
||||
try (java.io.InputStream in = file.getInputStream()) {
|
||||
sftpService.upload(session, remotePath, in, new SftpService.TransferProgressListener() {
|
||||
@Override
|
||||
public void onStart(long totalBytes) {
|
||||
status.setProgress(0, totalBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProgress(long count, long totalBytes) {
|
||||
long current = transferred.addAndGet(count);
|
||||
status.setProgress(current, status.getTotalBytes());
|
||||
}
|
||||
});
|
||||
}
|
||||
status.markSuccess();
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
SftpService.SftpSession existing = sessions.remove(key);
|
||||
if (existing != null) {
|
||||
existing.disconnect();
|
||||
}
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
SftpService.SftpSession existing = sessions.remove(key);
|
||||
if (existing != null) {
|
||||
existing.disconnect();
|
||||
}
|
||||
throw new RuntimeException(e);
|
||||
status.markError(e.getMessage() != null ? e.getMessage() : "Upload failed");
|
||||
}
|
||||
});
|
||||
status.setFuture(future);
|
||||
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
result.put("taskId", taskId);
|
||||
result.put("message", "Upload started");
|
||||
return ResponseEntity.ok(result);
|
||||
} catch (Exception e) {
|
||||
Map<String, String> error = new HashMap<>();
|
||||
Map<String, Object> error = new HashMap<>();
|
||||
error.put("error", e.getMessage());
|
||||
return ResponseEntity.status(500).body(error);
|
||||
}
|
||||
@@ -343,42 +454,13 @@ public class SftpController {
|
||||
Authentication authentication) {
|
||||
try {
|
||||
Long userId = getCurrentUserId(authentication);
|
||||
if (sourcePath == null || sourcePath.trim().isEmpty()) {
|
||||
Map<String, String> err = new HashMap<>();
|
||||
err.put("error", "sourcePath is required");
|
||||
return ResponseEntity.badRequest().body(err);
|
||||
ResponseEntity<Map<String, String>> validation = validateTransferPaths(sourcePath, targetPath);
|
||||
if (validation != null) {
|
||||
return validation;
|
||||
}
|
||||
if (targetPath == null || targetPath.trim().isEmpty()) {
|
||||
Map<String, String> err = new HashMap<>();
|
||||
err.put("error", "targetPath is required");
|
||||
return ResponseEntity.badRequest().body(err);
|
||||
}
|
||||
String sourceKey = sessionKey(userId, sourceConnectionId);
|
||||
String targetKey = sessionKey(userId, targetConnectionId);
|
||||
withTwoSessionLocks(sourceKey, targetKey, () -> {
|
||||
try {
|
||||
SftpService.SftpSession sourceSession = getOrCreateSession(sourceConnectionId, userId);
|
||||
SftpService.SftpSession targetSession = getOrCreateSession(targetConnectionId, userId);
|
||||
if (sourceConnectionId.equals(targetConnectionId)) {
|
||||
sftpService.rename(sourceSession, sourcePath.trim(), targetPath.trim());
|
||||
} else {
|
||||
sftpService.transferRemote(sourceSession, sourcePath.trim(), targetSession, targetPath.trim());
|
||||
}
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
SftpService.SftpSession source = sessions.remove(sourceKey);
|
||||
if (source != null) {
|
||||
source.disconnect();
|
||||
}
|
||||
if (!sourceKey.equals(targetKey)) {
|
||||
SftpService.SftpSession target = sessions.remove(targetKey);
|
||||
if (target != null) {
|
||||
target.disconnect();
|
||||
}
|
||||
}
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
TransferTaskStatus status = new TransferTaskStatus(UUID.randomUUID().toString(), userId, sourceConnectionId, targetConnectionId,
|
||||
sourcePath.trim(), targetPath.trim());
|
||||
executeTransfer(userId, sourceConnectionId, sourcePath, targetConnectionId, targetPath, status);
|
||||
Map<String, String> result = new HashMap<>();
|
||||
result.put("message", "Transferred");
|
||||
return ResponseEntity.ok(result);
|
||||
@@ -389,6 +471,206 @@ public class SftpController {
|
||||
}
|
||||
}
|
||||
|
||||
@PostMapping("/transfer-remote/tasks")
|
||||
public ResponseEntity<Map<String, Object>> createTransferRemoteTask(
|
||||
@RequestParam Long sourceConnectionId,
|
||||
@RequestParam String sourcePath,
|
||||
@RequestParam Long targetConnectionId,
|
||||
@RequestParam String targetPath,
|
||||
Authentication authentication) {
|
||||
Long userId = getCurrentUserId(authentication);
|
||||
ResponseEntity<Map<String, String>> validation = validateTransferPaths(sourcePath, targetPath);
|
||||
if (validation != null) {
|
||||
Map<String, Object> err = new HashMap<>();
|
||||
err.putAll(validation.getBody());
|
||||
return ResponseEntity.status(validation.getStatusCode()).body(err);
|
||||
}
|
||||
|
||||
TransferTaskStatus status = new TransferTaskStatus(UUID.randomUUID().toString(), userId, sourceConnectionId, targetConnectionId,
|
||||
sourcePath.trim(), targetPath.trim());
|
||||
status.setController(this);
|
||||
String taskKey = transferTaskKey(userId, status.getTaskId());
|
||||
transferTasks.put(taskKey, status);
|
||||
|
||||
Future<?> future = transferTaskExecutor.submit(() -> {
|
||||
status.setStatus("running");
|
||||
try {
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
status.markCancelled();
|
||||
return;
|
||||
}
|
||||
executeTransfer(userId, sourceConnectionId, sourcePath, targetConnectionId, targetPath, status);
|
||||
status.markSuccess();
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InterruptedException || Thread.currentThread().isInterrupted()) {
|
||||
status.markCancelled();
|
||||
return;
|
||||
}
|
||||
status.markError(toSftpErrorMessage(e, sourcePath, "transfer"));
|
||||
log.warn("SFTP transfer task failed: taskId={}, sourceConnectionId={}, sourcePath={}, targetConnectionId={}, targetPath={}, error={}",
|
||||
status.getTaskId(), sourceConnectionId, sourcePath, targetConnectionId, targetPath, e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
status.setFuture(future);
|
||||
|
||||
return ResponseEntity.ok(status.toResponse());
|
||||
}
|
||||
|
||||
@GetMapping("/transfer-remote/tasks/{taskId}")
|
||||
public ResponseEntity<Map<String, Object>> getTransferRemoteTask(
|
||||
@PathVariable String taskId,
|
||||
Authentication authentication) {
|
||||
Long userId = getCurrentUserId(authentication);
|
||||
TransferTaskStatus status = transferTasks.get(transferTaskKey(userId, taskId));
|
||||
if (status == null) {
|
||||
Map<String, Object> error = new HashMap<>();
|
||||
error.put("error", "Transfer task not found");
|
||||
return ResponseEntity.status(404).body(error);
|
||||
}
|
||||
return ResponseEntity.ok(status.toResponse());
|
||||
}
|
||||
|
||||
@GetMapping("/upload/tasks/{taskId}")
|
||||
public ResponseEntity<Map<String, Object>> getUploadTask(
|
||||
@PathVariable String taskId,
|
||||
Authentication authentication) {
|
||||
Long userId = getCurrentUserId(authentication);
|
||||
UploadTaskStatus status = uploadTasks.get(uploadTaskKey(userId, taskId));
|
||||
if (status == null) {
|
||||
Map<String, Object> error = new HashMap<>();
|
||||
error.put("error", "Upload task not found");
|
||||
return ResponseEntity.status(404).body(error);
|
||||
}
|
||||
return ResponseEntity.ok(status.toResponse());
|
||||
}
|
||||
|
||||
@DeleteMapping("/transfer-remote/tasks/{taskId}")
|
||||
public ResponseEntity<Map<String, Object>> cancelTransferRemoteTask(
|
||||
@PathVariable String taskId,
|
||||
Authentication authentication) {
|
||||
Long userId = getCurrentUserId(authentication);
|
||||
TransferTaskStatus status = transferTasks.get(transferTaskKey(userId, taskId));
|
||||
if (status == null) {
|
||||
Map<String, Object> error = new HashMap<>();
|
||||
error.put("error", "Transfer task not found");
|
||||
return ResponseEntity.status(404).body(error);
|
||||
}
|
||||
|
||||
boolean cancelled = status.cancel();
|
||||
Map<String, Object> result = status.toResponse();
|
||||
result.put("cancelRequested", cancelled);
|
||||
if (!cancelled) {
|
||||
result.put("message", "Task already running or finished; current transfer cannot be interrupted safely");
|
||||
}
|
||||
return ResponseEntity.ok(result);
|
||||
}
|
||||
|
||||
@GetMapping("/transfer-remote/tasks/{taskId}/progress")
|
||||
public SseEmitter streamTransferProgress(
|
||||
@PathVariable String taskId,
|
||||
Authentication authentication) {
|
||||
Long userId = getCurrentUserId(authentication);
|
||||
String taskKey = transferTaskKey(userId, taskId);
|
||||
TransferTaskStatus status = transferTasks.get(taskKey);
|
||||
|
||||
SseEmitter emitter = new SseEmitter(300000L); // 5 minutes timeout
|
||||
|
||||
if (status == null) {
|
||||
try {
|
||||
Map<String, String> error = new HashMap<>();
|
||||
error.put("error", "Task not found");
|
||||
emitter.send(SseEmitter.event().name("error").data(error));
|
||||
emitter.complete();
|
||||
} catch (IOException e) {
|
||||
emitter.completeWithError(e);
|
||||
}
|
||||
return emitter;
|
||||
}
|
||||
|
||||
taskEmitters.computeIfAbsent(taskKey, k -> new CopyOnWriteArrayList<>()).add(emitter);
|
||||
|
||||
emitter.onCompletion(() -> removeEmitter(taskKey, emitter));
|
||||
emitter.onTimeout(() -> removeEmitter(taskKey, emitter));
|
||||
emitter.onError((e) -> removeEmitter(taskKey, emitter));
|
||||
|
||||
// Send initial status
|
||||
try {
|
||||
emitter.send(SseEmitter.event().name("progress").data(status.toResponse()));
|
||||
} catch (IOException e) {
|
||||
removeEmitter(taskKey, emitter);
|
||||
}
|
||||
|
||||
return emitter;
|
||||
}
|
||||
|
||||
@GetMapping("/upload/tasks/{taskId}/progress")
|
||||
public SseEmitter streamUploadProgress(
|
||||
@PathVariable String taskId,
|
||||
Authentication authentication) {
|
||||
Long userId = getCurrentUserId(authentication);
|
||||
String taskKey = uploadTaskKey(userId, taskId);
|
||||
UploadTaskStatus status = uploadTasks.get(taskKey);
|
||||
|
||||
SseEmitter emitter = new SseEmitter(300000L); // 5 minutes timeout
|
||||
|
||||
if (status == null) {
|
||||
try {
|
||||
Map<String, String> error = new HashMap<>();
|
||||
error.put("error", "Task not found");
|
||||
emitter.send(SseEmitter.event().name("error").data(error));
|
||||
emitter.complete();
|
||||
} catch (IOException e) {
|
||||
emitter.completeWithError(e);
|
||||
}
|
||||
return emitter;
|
||||
}
|
||||
|
||||
taskEmitters.computeIfAbsent(taskKey, k -> new CopyOnWriteArrayList<>()).add(emitter);
|
||||
|
||||
emitter.onCompletion(() -> removeEmitter(taskKey, emitter));
|
||||
emitter.onTimeout(() -> removeEmitter(taskKey, emitter));
|
||||
emitter.onError((e) -> removeEmitter(taskKey, emitter));
|
||||
|
||||
// Send initial status
|
||||
try {
|
||||
emitter.send(SseEmitter.event().name("progress").data(status.toResponse()));
|
||||
} catch (IOException e) {
|
||||
removeEmitter(taskKey, emitter);
|
||||
}
|
||||
|
||||
return emitter;
|
||||
}
|
||||
|
||||
private void removeEmitter(String taskKey, SseEmitter emitter) {
|
||||
CopyOnWriteArrayList<SseEmitter> emitters = taskEmitters.get(taskKey);
|
||||
if (emitters != null) {
|
||||
emitters.remove(emitter);
|
||||
if (emitters.isEmpty()) {
|
||||
taskEmitters.remove(taskKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void broadcastProgress(String taskKey, Map<String, Object> data) {
|
||||
CopyOnWriteArrayList<SseEmitter> emitters = taskEmitters.get(taskKey);
|
||||
if (emitters == null || emitters.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<SseEmitter> deadEmitters = new java.util.ArrayList<>();
|
||||
for (SseEmitter emitter : emitters) {
|
||||
try {
|
||||
emitter.send(SseEmitter.event().name("progress").data(data));
|
||||
} catch (Exception e) {
|
||||
deadEmitters.add(emitter);
|
||||
}
|
||||
}
|
||||
|
||||
for (SseEmitter dead : deadEmitters) {
|
||||
removeEmitter(taskKey, dead);
|
||||
}
|
||||
}
|
||||
|
||||
@PostMapping("/disconnect")
|
||||
public ResponseEntity<Map<String, String>> disconnect(
|
||||
@RequestParam Long connectionId,
|
||||
@@ -419,6 +701,12 @@ public class SftpController {
|
||||
}
|
||||
}
|
||||
|
||||
public void cleanupExpiredTransferTasks(int timeoutMinutes) {
|
||||
long now = System.currentTimeMillis();
|
||||
long timeoutMillis = timeoutMinutes * 60L * 1000L;
|
||||
transferTasks.entrySet().removeIf(entry -> entry.getValue().isExpired(now, timeoutMillis));
|
||||
}
|
||||
|
||||
private final SftpSessionExpiryCleanup cleanupTask = new SftpSessionExpiryCleanup();
|
||||
|
||||
public static class SftpSessionExpiryCleanup {
|
||||
@@ -441,4 +729,262 @@ public class SftpController {
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
public static class TransferTaskStatus {
|
||||
private final String taskId;
|
||||
private final Long userId;
|
||||
private final Long sourceConnectionId;
|
||||
private final Long targetConnectionId;
|
||||
private final String sourcePath;
|
||||
private final String targetPath;
|
||||
private final long createdAt;
|
||||
private volatile String status;
|
||||
private volatile String error;
|
||||
private volatile long startedAt;
|
||||
private volatile long finishedAt;
|
||||
private final AtomicLong totalBytes;
|
||||
private final AtomicLong transferredBytes;
|
||||
private volatile Future<?> future;
|
||||
private volatile SftpController controller;
|
||||
|
||||
public TransferTaskStatus(String taskId,
|
||||
Long userId,
|
||||
Long sourceConnectionId,
|
||||
Long targetConnectionId,
|
||||
String sourcePath,
|
||||
String targetPath) {
|
||||
this.taskId = taskId;
|
||||
this.userId = userId;
|
||||
this.sourceConnectionId = sourceConnectionId;
|
||||
this.targetConnectionId = targetConnectionId;
|
||||
this.sourcePath = sourcePath;
|
||||
this.targetPath = targetPath;
|
||||
this.createdAt = System.currentTimeMillis();
|
||||
this.status = "queued";
|
||||
this.totalBytes = new AtomicLong(0);
|
||||
this.transferredBytes = new AtomicLong(0);
|
||||
}
|
||||
|
||||
public String getTaskId() {
|
||||
return taskId;
|
||||
}
|
||||
|
||||
public void setController(SftpController controller) {
|
||||
this.controller = controller;
|
||||
}
|
||||
|
||||
public void setFuture(Future<?> future) {
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
public void setStatus(String status) {
|
||||
this.status = status;
|
||||
if ("running".equals(status) && startedAt == 0) {
|
||||
startedAt = System.currentTimeMillis();
|
||||
}
|
||||
notifyProgress();
|
||||
}
|
||||
|
||||
public void setProgress(long transferred, long total) {
|
||||
if (startedAt == 0) {
|
||||
startedAt = System.currentTimeMillis();
|
||||
}
|
||||
transferredBytes.set(Math.max(0, transferred));
|
||||
totalBytes.set(Math.max(0, total));
|
||||
notifyProgress();
|
||||
}
|
||||
|
||||
public void markSuccess() {
|
||||
long total = totalBytes.get();
|
||||
if (total > 0) {
|
||||
transferredBytes.set(total);
|
||||
}
|
||||
status = "success";
|
||||
finishedAt = System.currentTimeMillis();
|
||||
notifyProgress();
|
||||
}
|
||||
|
||||
public void markError(String message) {
|
||||
status = "error";
|
||||
error = message;
|
||||
finishedAt = System.currentTimeMillis();
|
||||
notifyProgress();
|
||||
}
|
||||
|
||||
public void markCancelled() {
|
||||
status = "cancelled";
|
||||
finishedAt = System.currentTimeMillis();
|
||||
notifyProgress();
|
||||
}
|
||||
|
||||
private void notifyProgress() {
|
||||
if (controller != null) {
|
||||
String taskKey = controller.transferTaskKey(userId, taskId);
|
||||
controller.broadcastProgress(taskKey, toResponse());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean cancel() {
|
||||
if (!"queued".equals(status) && !"running".equals(status)) {
|
||||
return false;
|
||||
}
|
||||
Future<?> currentFuture = future;
|
||||
if (currentFuture != null) {
|
||||
currentFuture.cancel(true);
|
||||
}
|
||||
markCancelled();
|
||||
return true;
|
||||
}
|
||||
|
||||
public Map<String, Object> toResponse() {
|
||||
long total = totalBytes.get();
|
||||
long transferred = transferredBytes.get();
|
||||
int progress = total > 0 ? (int) Math.min(100, Math.round((transferred * 100.0) / total)) :
|
||||
(("success".equals(status) || "error".equals(status) || "cancelled".equals(status)) ? 100 : 0);
|
||||
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
result.put("taskId", taskId);
|
||||
result.put("userId", userId);
|
||||
result.put("sourceConnectionId", sourceConnectionId);
|
||||
result.put("targetConnectionId", targetConnectionId);
|
||||
result.put("sourcePath", sourcePath);
|
||||
result.put("targetPath", targetPath);
|
||||
result.put("status", status);
|
||||
result.put("progress", progress);
|
||||
result.put("transferredBytes", transferred);
|
||||
result.put("totalBytes", total);
|
||||
result.put("createdAt", createdAt);
|
||||
result.put("startedAt", startedAt);
|
||||
result.put("finishedAt", finishedAt);
|
||||
if (error != null && !error.isEmpty()) {
|
||||
result.put("error", error);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public boolean isExpired(long now, long timeoutMillis) {
|
||||
if ("queued".equals(status) || "running".equals(status)) {
|
||||
return false;
|
||||
}
|
||||
long endTime = finishedAt > 0 ? finishedAt : createdAt;
|
||||
return now - endTime > timeoutMillis;
|
||||
}
|
||||
}
|
||||
|
||||
public static class UploadTaskStatus {
|
||||
private final String taskId;
|
||||
private final Long userId;
|
||||
private final Long connectionId;
|
||||
private final String path;
|
||||
private final String filename;
|
||||
private final long fileSize;
|
||||
private final long createdAt;
|
||||
private volatile String status;
|
||||
private volatile String error;
|
||||
private volatile long startedAt;
|
||||
private volatile long finishedAt;
|
||||
private final AtomicLong totalBytes;
|
||||
private final AtomicLong transferredBytes;
|
||||
private volatile Future<?> future;
|
||||
private volatile SftpController controller;
|
||||
|
||||
public UploadTaskStatus(String taskId, Long userId, Long connectionId,
|
||||
String path, String filename, long fileSize) {
|
||||
this.taskId = taskId;
|
||||
this.userId = userId;
|
||||
this.connectionId = connectionId;
|
||||
this.path = path;
|
||||
this.filename = filename;
|
||||
this.fileSize = fileSize;
|
||||
this.createdAt = System.currentTimeMillis();
|
||||
this.status = "queued";
|
||||
this.totalBytes = new AtomicLong(fileSize);
|
||||
this.transferredBytes = new AtomicLong(0);
|
||||
}
|
||||
|
||||
public long getTotalBytes() {
|
||||
return totalBytes.get();
|
||||
}
|
||||
|
||||
public void setController(SftpController controller) {
|
||||
this.controller = controller;
|
||||
}
|
||||
|
||||
public void setFuture(Future<?> future) {
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
public void setStatus(String status) {
|
||||
this.status = status;
|
||||
if ("running".equals(status) && startedAt == 0) {
|
||||
startedAt = System.currentTimeMillis();
|
||||
}
|
||||
notifyProgress();
|
||||
}
|
||||
|
||||
public void setProgress(long transferred, long total) {
|
||||
if (startedAt == 0) {
|
||||
startedAt = System.currentTimeMillis();
|
||||
}
|
||||
transferredBytes.set(Math.max(0, transferred));
|
||||
if (total > 0) {
|
||||
totalBytes.set(Math.max(0, total));
|
||||
}
|
||||
notifyProgress();
|
||||
}
|
||||
|
||||
public void markSuccess() {
|
||||
long total = totalBytes.get();
|
||||
if (total > 0) {
|
||||
transferredBytes.set(total);
|
||||
}
|
||||
status = "success";
|
||||
finishedAt = System.currentTimeMillis();
|
||||
notifyProgress();
|
||||
}
|
||||
|
||||
public void markError(String message) {
|
||||
status = "error";
|
||||
error = message;
|
||||
finishedAt = System.currentTimeMillis();
|
||||
notifyProgress();
|
||||
}
|
||||
|
||||
private void notifyProgress() {
|
||||
if (controller != null) {
|
||||
String taskKey = controller.uploadTaskKey(userId, taskId);
|
||||
controller.broadcastProgress(taskKey, toResponse());
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, Object> toResponse() {
|
||||
long total = totalBytes.get();
|
||||
long transferred = transferredBytes.get();
|
||||
int progress = total > 0 ? (int) Math.min(100, Math.round((transferred * 100.0) / total)) :
|
||||
(("success".equals(status) || "error".equals(status)) ? 100 : 0);
|
||||
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
result.put("taskId", taskId);
|
||||
result.put("status", status);
|
||||
result.put("progress", progress);
|
||||
result.put("transferredBytes", transferred);
|
||||
result.put("totalBytes", total);
|
||||
result.put("filename", filename);
|
||||
result.put("createdAt", createdAt);
|
||||
result.put("startedAt", startedAt);
|
||||
result.put("finishedAt", finishedAt);
|
||||
if (error != null && !error.isEmpty()) {
|
||||
result.put("error", error);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public boolean isExpired(long now, long timeoutMillis) {
|
||||
if ("queued".equals(status) || "running".equals(status)) {
|
||||
return false;
|
||||
}
|
||||
long endTime = finishedAt > 0 ? finishedAt : createdAt;
|
||||
return now - endTime > timeoutMillis;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,8 +52,9 @@ public class JwtAuthenticationFilter extends OncePerRequestFilter {
|
||||
if (StringUtils.hasText(bearerToken) && bearerToken.startsWith("Bearer ")) {
|
||||
return bearerToken.substring(7);
|
||||
}
|
||||
// WebSocket handshake sends token as query param
|
||||
if (request.getRequestURI() != null && request.getRequestURI().startsWith("/ws/")) {
|
||||
// WebSocket handshake and SSE endpoints send token as query param
|
||||
String uri = request.getRequestURI();
|
||||
if (uri != null && (uri.startsWith("/ws/") || uri.contains("/progress"))) {
|
||||
String token = request.getParameter("token");
|
||||
if (StringUtils.hasText(token)) {
|
||||
return token;
|
||||
|
||||
@@ -1,26 +1,26 @@
|
||||
package com.sshmanager.service;
|
||||
|
||||
import com.jcraft.jsch.ChannelSftp;
|
||||
import com.jcraft.jsch.JSch;
|
||||
import com.jcraft.jsch.Session;
|
||||
import com.jcraft.jsch.SftpException;
|
||||
import com.sshmanager.entity.Connection;
|
||||
import org.springframework.stereotype.Service;
|
||||
import com.jcraft.jsch.ChannelSftp;
|
||||
import com.jcraft.jsch.JSch;
|
||||
import com.jcraft.jsch.Session;
|
||||
import com.jcraft.jsch.SftpATTRS;
|
||||
import com.jcraft.jsch.SftpException;
|
||||
import com.jcraft.jsch.SftpProgressMonitor;
|
||||
import com.sshmanager.entity.Connection;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PipedInputStream;
|
||||
import java.io.PipedOutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@Service
|
||||
public class SftpService {
|
||||
@@ -90,7 +90,7 @@ public class SftpService {
|
||||
}
|
||||
}
|
||||
|
||||
public static class FileInfo {
|
||||
public static class FileInfo {
|
||||
public String name;
|
||||
public boolean directory;
|
||||
public long size;
|
||||
@@ -101,8 +101,14 @@ public class SftpService {
|
||||
this.directory = directory;
|
||||
this.size = size;
|
||||
this.mtime = mtime;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public interface TransferProgressListener {
|
||||
void onStart(long totalBytes);
|
||||
|
||||
void onProgress(long transferredBytes, long totalBytes);
|
||||
}
|
||||
|
||||
public List<FileInfo> listFiles(SftpSession sftpSession, String path) throws Exception {
|
||||
String listPath = (path == null || path.trim().isEmpty()) ? "." : path.trim();
|
||||
@@ -166,6 +172,30 @@ public class SftpService {
|
||||
|
||||
public void upload(SftpSession sftpSession, String remotePath, InputStream in) throws Exception {
|
||||
sftpSession.getChannel().put(in, remotePath);
|
||||
}
|
||||
|
||||
public void upload(SftpSession sftpSession, String remotePath, InputStream in, TransferProgressListener progressListener) throws Exception {
|
||||
sftpSession.getChannel().put(in, remotePath, new SftpProgressMonitor() {
|
||||
@Override
|
||||
public void init(int op, String src, String dest, long max) {
|
||||
if (progressListener != null) {
|
||||
progressListener.onStart(max);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean count(long count) {
|
||||
if (progressListener != null) {
|
||||
progressListener.onProgress(count, 0);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void end() {
|
||||
// Progress listener will be notified by controller
|
||||
}
|
||||
}, ChannelSftp.OVERWRITE);
|
||||
}
|
||||
|
||||
public void delete(SftpSession sftpSession, String remotePath, boolean isDir) throws Exception {
|
||||
@@ -196,28 +226,75 @@ public class SftpService {
|
||||
* Transfer a single file from source session to target session (streaming, no full file in memory).
|
||||
* Fails if sourcePath is a directory.
|
||||
*/
|
||||
public void transferRemote(SftpSession source, String sourcePath, SftpSession target, String targetPath)
|
||||
throws Exception {
|
||||
if (source.getChannel().stat(sourcePath).isDir()) {
|
||||
throw new IllegalArgumentException("Source path is a directory; only single file transfer is supported");
|
||||
}
|
||||
final int pipeBufferSize = 65536;
|
||||
PipedOutputStream pos = new PipedOutputStream();
|
||||
PipedInputStream pis = new PipedInputStream(pos, pipeBufferSize);
|
||||
|
||||
Future<?> putFuture = executorService.submit(() -> {
|
||||
try {
|
||||
target.getChannel().put(pis, targetPath);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
source.getChannel().get(sourcePath, pos);
|
||||
pos.close();
|
||||
putFuture.get(5, TimeUnit.MINUTES);
|
||||
try {
|
||||
pis.close();
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
public void transferRemote(SftpSession source, String sourcePath, SftpSession target, String targetPath)
|
||||
throws Exception {
|
||||
transferRemote(source, sourcePath, target, targetPath, null);
|
||||
}
|
||||
|
||||
public void transferRemote(SftpSession source,
|
||||
String sourcePath,
|
||||
SftpSession target,
|
||||
String targetPath,
|
||||
TransferProgressListener progressListener) throws Exception {
|
||||
SftpATTRS attrs = source.getChannel().stat(sourcePath);
|
||||
if (attrs.isDir()) {
|
||||
throw new IllegalArgumentException("Source path is a directory; only single file transfer is supported");
|
||||
}
|
||||
final long totalBytes = attrs.getSize();
|
||||
final int pipeBufferSize = 65536;
|
||||
PipedOutputStream pos = new PipedOutputStream();
|
||||
PipedInputStream pis = new PipedInputStream(pos, pipeBufferSize);
|
||||
AtomicLong transferredBytes = new AtomicLong(0);
|
||||
|
||||
if (progressListener != null) {
|
||||
progressListener.onStart(totalBytes);
|
||||
}
|
||||
|
||||
Future<?> putFuture = executorService.submit(() -> {
|
||||
try {
|
||||
target.getChannel().put(pis, targetPath, new SftpProgressMonitor() {
|
||||
@Override
|
||||
public void init(int op, String src, String dest, long max) {
|
||||
if (progressListener != null) {
|
||||
progressListener.onStart(totalBytes);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean count(long count) {
|
||||
long current = transferredBytes.addAndGet(count);
|
||||
if (progressListener != null) {
|
||||
progressListener.onProgress(current, totalBytes);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void end() {
|
||||
if (progressListener != null) {
|
||||
progressListener.onProgress(totalBytes, totalBytes);
|
||||
}
|
||||
}
|
||||
}, ChannelSftp.OVERWRITE);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
try {
|
||||
source.getChannel().get(sourcePath, pos);
|
||||
} finally {
|
||||
try {
|
||||
pos.close();
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
try {
|
||||
putFuture.get();
|
||||
} finally {
|
||||
try {
|
||||
pis.close();
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user