Implement session locking in SftpController to ensure thread safety during concurrent SFTP operations. Introduce a method to handle session locks and improve error handling by forcing reconnections on exceptions. This change addresses potential issues with shared ChannelSftp instances in concurrent requests.
This commit is contained in:
@@ -20,6 +20,7 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
@@ -33,6 +34,11 @@ public class SftpController {
|
|||||||
private final SftpService sftpService;
|
private final SftpService sftpService;
|
||||||
|
|
||||||
private final Map<String, SftpService.SftpSession> sessions = new ConcurrentHashMap<>();
|
private final Map<String, SftpService.SftpSession> sessions = new ConcurrentHashMap<>();
|
||||||
|
/**
|
||||||
|
* JSch ChannelSftp is not thread-safe. If the frontend triggers concurrent requests (e.g. rapid ".." navigation),
|
||||||
|
* sharing one ChannelSftp can crash with internal stream exceptions. We serialize all SFTP ops per (user, connection).
|
||||||
|
*/
|
||||||
|
private final Map<String, Object> sessionLocks = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public SftpController(ConnectionService connectionService,
|
public SftpController(ConnectionService connectionService,
|
||||||
UserRepository userRepository,
|
UserRepository userRepository,
|
||||||
@@ -51,6 +57,13 @@ public class SftpController {
|
|||||||
return userId + ":" + connectionId;
|
return userId + ":" + connectionId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private <T> T withSessionLock(String key, Supplier<T> action) {
|
||||||
|
Object lock = sessionLocks.computeIfAbsent(key, k -> new Object());
|
||||||
|
synchronized (lock) {
|
||||||
|
return action.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private SftpService.SftpSession getOrCreateSession(Long connectionId, Long userId) throws Exception {
|
private SftpService.SftpSession getOrCreateSession(Long connectionId, Long userId) throws Exception {
|
||||||
String key = sessionKey(userId, connectionId);
|
String key = sessionKey(userId, connectionId);
|
||||||
SftpService.SftpSession session = sessions.get(key);
|
SftpService.SftpSession session = sessions.get(key);
|
||||||
@@ -72,12 +85,24 @@ public class SftpController {
|
|||||||
Authentication authentication) {
|
Authentication authentication) {
|
||||||
try {
|
try {
|
||||||
Long userId = getCurrentUserId(authentication);
|
Long userId = getCurrentUserId(authentication);
|
||||||
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
String key = sessionKey(userId, connectionId);
|
||||||
List<SftpService.FileInfo> files = sftpService.listFiles(session, path);
|
return withSessionLock(key, () -> {
|
||||||
List<SftpFileInfo> dtos = files.stream()
|
try {
|
||||||
.map(f -> new SftpFileInfo(f.name, f.directory, f.size, f.mtime))
|
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
||||||
.collect(Collectors.toList());
|
List<SftpService.FileInfo> files = sftpService.listFiles(session, path);
|
||||||
return ResponseEntity.ok(dtos);
|
List<SftpFileInfo> dtos = files.stream()
|
||||||
|
.map(f -> new SftpFileInfo(f.name, f.directory, f.size, f.mtime))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
return ResponseEntity.ok(dtos);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// If the underlying SFTP channel got into a bad state, force reconnect on next request.
|
||||||
|
SftpService.SftpSession existing = sessions.remove(key);
|
||||||
|
if (existing != null) {
|
||||||
|
existing.disconnect();
|
||||||
|
}
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String errorMsg = toSftpErrorMessage(e, path, "list");
|
String errorMsg = toSftpErrorMessage(e, path, "list");
|
||||||
log.warn("SFTP list failed: connectionId={}, path={}, error={}", connectionId, path, errorMsg, e);
|
log.warn("SFTP list failed: connectionId={}, path={}, error={}", connectionId, path, errorMsg, e);
|
||||||
@@ -91,12 +116,16 @@ public class SftpController {
|
|||||||
if (e.getMessage() != null && !e.getMessage().trim().isEmpty()) {
|
if (e.getMessage() != null && !e.getMessage().trim().isEmpty()) {
|
||||||
return e.getMessage();
|
return e.getMessage();
|
||||||
}
|
}
|
||||||
Throwable cause = e.getCause();
|
// Unwrap nested RuntimeExceptions to find the underlying SftpException (if any).
|
||||||
if (cause instanceof SftpException) {
|
Throwable cur = e;
|
||||||
return SftpService.formatSftpExceptionMessage((SftpException) cause, path, operation);
|
for (int i = 0; i < 10 && cur != null; i++) {
|
||||||
}
|
if (cur instanceof SftpException) {
|
||||||
if (e instanceof SftpException) {
|
return SftpService.formatSftpExceptionMessage((SftpException) cur, path, operation);
|
||||||
return SftpService.formatSftpExceptionMessage((SftpException) e, path, operation);
|
}
|
||||||
|
if (cur.getMessage() != null && !cur.getMessage().trim().isEmpty()) {
|
||||||
|
return cur.getMessage();
|
||||||
|
}
|
||||||
|
cur = cur.getCause();
|
||||||
}
|
}
|
||||||
return operation + " failed";
|
return operation + " failed";
|
||||||
}
|
}
|
||||||
@@ -107,11 +136,22 @@ public class SftpController {
|
|||||||
Authentication authentication) {
|
Authentication authentication) {
|
||||||
try {
|
try {
|
||||||
Long userId = getCurrentUserId(authentication);
|
Long userId = getCurrentUserId(authentication);
|
||||||
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
String key = sessionKey(userId, connectionId);
|
||||||
String pwd = sftpService.pwd(session);
|
return withSessionLock(key, () -> {
|
||||||
Map<String, String> result = new HashMap<>();
|
try {
|
||||||
result.put("path", pwd);
|
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
||||||
return ResponseEntity.ok(result);
|
String pwd = sftpService.pwd(session);
|
||||||
|
Map<String, String> result = new HashMap<>();
|
||||||
|
result.put("path", pwd);
|
||||||
|
return ResponseEntity.ok(result);
|
||||||
|
} catch (Exception e) {
|
||||||
|
SftpService.SftpSession existing = sessions.remove(key);
|
||||||
|
if (existing != null) {
|
||||||
|
existing.disconnect();
|
||||||
|
}
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("SFTP pwd failed: connectionId={}", connectionId, e);
|
log.warn("SFTP pwd failed: connectionId={}", connectionId, e);
|
||||||
Map<String, String> err = new HashMap<>();
|
Map<String, String> err = new HashMap<>();
|
||||||
@@ -127,13 +167,24 @@ public class SftpController {
|
|||||||
Authentication authentication) {
|
Authentication authentication) {
|
||||||
try {
|
try {
|
||||||
Long userId = getCurrentUserId(authentication);
|
Long userId = getCurrentUserId(authentication);
|
||||||
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
String key = sessionKey(userId, connectionId);
|
||||||
byte[] data = sftpService.download(session, path);
|
return withSessionLock(key, () -> {
|
||||||
String filename = path.contains("/") ? path.substring(path.lastIndexOf('/') + 1) : path;
|
try {
|
||||||
return ResponseEntity.ok()
|
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
||||||
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
|
byte[] data = sftpService.download(session, path);
|
||||||
.contentType(MediaType.APPLICATION_OCTET_STREAM)
|
String filename = path.contains("/") ? path.substring(path.lastIndexOf('/') + 1) : path;
|
||||||
.body(data);
|
return ResponseEntity.ok()
|
||||||
|
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
|
||||||
|
.contentType(MediaType.APPLICATION_OCTET_STREAM)
|
||||||
|
.body(data);
|
||||||
|
} catch (Exception e) {
|
||||||
|
SftpService.SftpSession existing = sessions.remove(key);
|
||||||
|
if (existing != null) {
|
||||||
|
existing.disconnect();
|
||||||
|
}
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
return ResponseEntity.status(500).build();
|
return ResponseEntity.status(500).build();
|
||||||
}
|
}
|
||||||
@@ -147,14 +198,25 @@ public class SftpController {
|
|||||||
Authentication authentication) {
|
Authentication authentication) {
|
||||||
try {
|
try {
|
||||||
Long userId = getCurrentUserId(authentication);
|
Long userId = getCurrentUserId(authentication);
|
||||||
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
String key = sessionKey(userId, connectionId);
|
||||||
String remotePath = (path == null || path.isEmpty() || path.equals("/"))
|
return withSessionLock(key, () -> {
|
||||||
? "/" + file.getOriginalFilename()
|
try {
|
||||||
: (path.endsWith("/") ? path + file.getOriginalFilename() : path + "/" + file.getOriginalFilename());
|
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
||||||
sftpService.upload(session, remotePath, file.getBytes());
|
String remotePath = (path == null || path.isEmpty() || path.equals("/"))
|
||||||
Map<String, String> result = new HashMap<>();
|
? "/" + file.getOriginalFilename()
|
||||||
result.put("message", "Uploaded");
|
: (path.endsWith("/") ? path + file.getOriginalFilename() : path + "/" + file.getOriginalFilename());
|
||||||
return ResponseEntity.ok(result);
|
sftpService.upload(session, remotePath, file.getBytes());
|
||||||
|
Map<String, String> result = new HashMap<>();
|
||||||
|
result.put("message", "Uploaded");
|
||||||
|
return ResponseEntity.ok(result);
|
||||||
|
} catch (Exception e) {
|
||||||
|
SftpService.SftpSession existing = sessions.remove(key);
|
||||||
|
if (existing != null) {
|
||||||
|
existing.disconnect();
|
||||||
|
}
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Map<String, String> error = new HashMap<>();
|
Map<String, String> error = new HashMap<>();
|
||||||
error.put("error", e.getMessage());
|
error.put("error", e.getMessage());
|
||||||
@@ -170,11 +232,22 @@ public class SftpController {
|
|||||||
Authentication authentication) {
|
Authentication authentication) {
|
||||||
try {
|
try {
|
||||||
Long userId = getCurrentUserId(authentication);
|
Long userId = getCurrentUserId(authentication);
|
||||||
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
String key = sessionKey(userId, connectionId);
|
||||||
sftpService.delete(session, path, directory);
|
return withSessionLock(key, () -> {
|
||||||
Map<String, String> result = new HashMap<>();
|
try {
|
||||||
result.put("message", "Deleted");
|
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
||||||
return ResponseEntity.ok(result);
|
sftpService.delete(session, path, directory);
|
||||||
|
Map<String, String> result = new HashMap<>();
|
||||||
|
result.put("message", "Deleted");
|
||||||
|
return ResponseEntity.ok(result);
|
||||||
|
} catch (Exception e) {
|
||||||
|
SftpService.SftpSession existing = sessions.remove(key);
|
||||||
|
if (existing != null) {
|
||||||
|
existing.disconnect();
|
||||||
|
}
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Map<String, String> error = new HashMap<>();
|
Map<String, String> error = new HashMap<>();
|
||||||
error.put("error", e.getMessage());
|
error.put("error", e.getMessage());
|
||||||
@@ -189,11 +262,22 @@ public class SftpController {
|
|||||||
Authentication authentication) {
|
Authentication authentication) {
|
||||||
try {
|
try {
|
||||||
Long userId = getCurrentUserId(authentication);
|
Long userId = getCurrentUserId(authentication);
|
||||||
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
String key = sessionKey(userId, connectionId);
|
||||||
sftpService.mkdir(session, path);
|
return withSessionLock(key, () -> {
|
||||||
Map<String, String> result = new HashMap<>();
|
try {
|
||||||
result.put("message", "Created");
|
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
||||||
return ResponseEntity.ok(result);
|
sftpService.mkdir(session, path);
|
||||||
|
Map<String, String> result = new HashMap<>();
|
||||||
|
result.put("message", "Created");
|
||||||
|
return ResponseEntity.ok(result);
|
||||||
|
} catch (Exception e) {
|
||||||
|
SftpService.SftpSession existing = sessions.remove(key);
|
||||||
|
if (existing != null) {
|
||||||
|
existing.disconnect();
|
||||||
|
}
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Map<String, String> error = new HashMap<>();
|
Map<String, String> error = new HashMap<>();
|
||||||
error.put("error", e.getMessage());
|
error.put("error", e.getMessage());
|
||||||
@@ -209,11 +293,22 @@ public class SftpController {
|
|||||||
Authentication authentication) {
|
Authentication authentication) {
|
||||||
try {
|
try {
|
||||||
Long userId = getCurrentUserId(authentication);
|
Long userId = getCurrentUserId(authentication);
|
||||||
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
String key = sessionKey(userId, connectionId);
|
||||||
sftpService.rename(session, oldPath, newPath);
|
return withSessionLock(key, () -> {
|
||||||
Map<String, String> result = new HashMap<>();
|
try {
|
||||||
result.put("message", "Renamed");
|
SftpService.SftpSession session = getOrCreateSession(connectionId, userId);
|
||||||
return ResponseEntity.ok(result);
|
sftpService.rename(session, oldPath, newPath);
|
||||||
|
Map<String, String> result = new HashMap<>();
|
||||||
|
result.put("message", "Renamed");
|
||||||
|
return ResponseEntity.ok(result);
|
||||||
|
} catch (Exception e) {
|
||||||
|
SftpService.SftpSession existing = sessions.remove(key);
|
||||||
|
if (existing != null) {
|
||||||
|
existing.disconnect();
|
||||||
|
}
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Map<String, String> error = new HashMap<>();
|
Map<String, String> error = new HashMap<>();
|
||||||
error.put("error", e.getMessage());
|
error.put("error", e.getMessage());
|
||||||
@@ -267,6 +362,7 @@ public class SftpController {
|
|||||||
if (session != null) {
|
if (session != null) {
|
||||||
session.disconnect();
|
session.disconnect();
|
||||||
}
|
}
|
||||||
|
sessionLocks.remove(key);
|
||||||
Map<String, String> result = new HashMap<>();
|
Map<String, String> result = new HashMap<>();
|
||||||
result.put("message", "Disconnected");
|
result.put("message", "Disconnected");
|
||||||
return ResponseEntity.ok(result);
|
return ResponseEntity.ok(result);
|
||||||
|
|||||||
Reference in New Issue
Block a user