fix oss文件下载

This commit is contained in:
秋辞未寒
2026-03-26 00:19:44 +08:00
parent e00837a26f
commit f45f58c340
6 changed files with 103 additions and 33 deletions

View File

@@ -10,6 +10,7 @@ import org.dromara.common.oss.io.OutputStreamDownloadSubscriber;
import org.dromara.common.oss.model.GetObjectResult;
import org.dromara.common.oss.model.HandleAsyncResult;
import org.dromara.common.oss.model.PutObjectResult;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.ResponsePublisher;
@@ -29,7 +30,7 @@ import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
@@ -309,7 +310,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
try {
ResponsePublisher<GetObjectResponse> publisher = doCustomDownload(builder -> builder.bucket(bucket).key(key), AsyncResponseTransformer.toPublisher(), null);
GetObjectResult getObjectResult = buildGetObjectResult(key, publisher.response());
publisher.subscribe(downloadSubscriber);
publisher.subscribe(downloadSubscriber).join();
return getObjectResult;
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
@@ -319,6 +320,21 @@ public abstract class AbstractOssClientImpl implements OssClient {
}
}
@Override
public <T> T bucketDownload(String bucket, String key, BiFunction<GetObjectResult, InputStream, T> downloadTransformer) {
try {
ResponseInputStream<GetObjectResponse> responseInputStream = doCustomDownload(builder -> builder.bucket(bucket).key(key), AsyncResponseTransformer.toBlockingInputStream(), null);
GetObjectResponse response = responseInputStream.response();
GetObjectResult getObjectResult = buildGetObjectResult(key, response);
return downloadTransformer.apply(getObjectResult, responseInputStream);
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.form(e);
}
}
@Override
public GetObjectResult bucketDownload(String bucket, String key, Path path) {
try (OutputStream out = Files.newOutputStream(path)) {
@@ -362,7 +378,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
return GetObjectResult.form(
key,
response.eTag(),
LocalDateTime.from(response.lastModified()),
response.lastModified().atOffset(ZoneOffset.UTC).toLocalDateTime(),
response.contentLength(),
response.contentType(),
response.contentDisposition(),
@@ -446,6 +462,11 @@ public abstract class AbstractOssClientImpl implements OssClient {
return bucketDownload(defaultBucket(), key, downloadSubscriber);
}
@Override
public <T> T download(String key, BiFunction<GetObjectResult, InputStream, T> downloadTransformer) {
return bucketDownload(defaultBucket(), key, downloadTransformer);
}
@Override
public GetObjectResult download(String key, Path path) {
return bucketDownload(defaultBucket(), key, path);

View File

@@ -43,12 +43,12 @@ public interface OssClient extends AutoCloseable {
/**
* S3 存储客户端ID
*
* <p>
* 用于标识客户端,初始化后不允许更改
*
* @return S3 存储客户端ID
*/
default String clientId(){
default String clientId() {
return IdUtil.fastSimpleUUID();
}
@@ -219,6 +219,16 @@ public interface OssClient extends AutoCloseable {
*/
GetObjectResult bucketDownload(String bucket, String key, OutputStreamDownloadSubscriber downloadSubscriber);
/**
* 将指定存储桶中的对象下载到转换器中,由使用者决定返回值。
*
* @param bucket 存储桶名称
* @param key 对象键
* @param downloadTransformer 下载转换器
* @return 下载结果
*/
<T> T bucketDownload(String bucket, String key, BiFunction<GetObjectResult, InputStream, T> downloadTransformer);
/**
* 将指定存储桶中的对象下载到本地路径。
*
@@ -364,6 +374,15 @@ public interface OssClient extends AutoCloseable {
*/
GetObjectResult download(String key, OutputStreamDownloadSubscriber downloadSubscriber);
/**
* 将指定存储桶中的对象下载到转换器中,由使用者决定返回值。
*
* @param key 对象键
* @param downloadTransformer 下载转换器
* @return 下载结果
*/
<T> T download(String key, BiFunction<GetObjectResult, InputStream, T> downloadTransformer);
/**
* 将默认存储桶中的对象下载到本地路径。
*

View File

@@ -3,7 +3,6 @@ package org.dromara.common.oss.io;
import org.dromara.common.oss.exception.S3StorageException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
@@ -19,11 +18,15 @@ public class OutputStreamDownloadSubscriber implements Consumer<ByteBuffer>, Aut
private final WritableByteChannel channel;
private OutputStreamDownloadSubscriber(WritableByteChannel channel) {
private final boolean allowAutoClose;
private OutputStreamDownloadSubscriber(WritableByteChannel channel, boolean allowAutoClose) {
this.channel = channel;
this.allowAutoClose = allowAutoClose;
}
private OutputStreamDownloadSubscriber(OutputStream out) {
private OutputStreamDownloadSubscriber(OutputStream out, boolean allowAutoClose) {
this.allowAutoClose = allowAutoClose;
// 创建可写入的字节通道
if (out instanceof FileOutputStream outputStream) {
// 如果是文件输入流,直接获取文件输出流的 Channel
@@ -35,18 +38,18 @@ public class OutputStreamDownloadSubscriber implements Consumer<ByteBuffer>, Aut
@Override
public void accept(ByteBuffer byteBuffer) {
try (channel) {
try {
while (byteBuffer.hasRemaining()) {
channel.write(byteBuffer);
}
} catch (IOException e) {
} catch (Exception e) {
throw S3StorageException.form(e);
}
}
@Override
public void close() throws Exception {
if (channel.isOpen()) {
if (channel.isOpen() && allowAutoClose) {
channel.close();
}
}
@@ -58,7 +61,18 @@ public class OutputStreamDownloadSubscriber implements Consumer<ByteBuffer>, Aut
* @return 输出流下载订阅器
*/
public static OutputStreamDownloadSubscriber create(OutputStream out) {
return new OutputStreamDownloadSubscriber(out);
return create(out, false);
}
/**
* 创建一个输出流下载订阅器
*
* @param out 输出流
* @param allowAutoClose 是否允许自动关闭流
* @return 输出流下载订阅器
*/
public static OutputStreamDownloadSubscriber create(OutputStream out, boolean allowAutoClose) {
return new OutputStreamDownloadSubscriber(out, allowAutoClose);
}
/**
@@ -68,7 +82,18 @@ public class OutputStreamDownloadSubscriber implements Consumer<ByteBuffer>, Aut
* @return 输出流下载订阅器
*/
public static OutputStreamDownloadSubscriber create(WritableByteChannel channel) {
return new OutputStreamDownloadSubscriber(channel);
return create(channel, false);
}
/**
* 创建一个输出流下载订阅器
*
* @param channel 可写字节通道
* @param allowAutoClose 是否允许自动关闭流
* @return 输出流下载订阅器
*/
public static OutputStreamDownloadSubscriber create(WritableByteChannel channel, boolean allowAutoClose) {
return new OutputStreamDownloadSubscriber(channel, allowAutoClose);
}
}