update 更新 新的S3客户端支持初始化和刷新配置

This commit is contained in:
秋辞未寒
2026-03-21 08:31:09 +08:00
parent 160b501b37
commit aa38a7b98a
22 changed files with 487 additions and 456 deletions

View File

@@ -1,14 +0,0 @@
package org.dromara.common.oss.s3.builder;
/**
* 构建器
*
* @param <T> 参数类型
* @param <R> 构建目标类型
* @author 秋辞未寒
*/
public interface Builder<T,R> {
R build(T param);
}

View File

@@ -1,40 +0,0 @@
package org.dromara.common.oss.s3.builder;
import org.dromara.common.oss.s3.config.S3StorageClientConfig;
import org.dromara.common.oss.s3.exception.S3StorageException;
import org.dromara.common.oss.s3.util.BucketUrlUtil;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Optional;
/**
* 云服务商文件对象桶URL构建器
*
* @author 秋辞未寒
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public enum CloudServiceBucketUrlBuilder implements StrBuilder<S3StorageClientConfig> {
INSTANCE;
@Override
public String build(S3StorageClientConfig config) {
boolean useHttps = config.useHttps();
Optional<String> domainOpt = config.domain().filter(s -> !s.isBlank());
// 如果已经配置了自定义域名,则优先使用域名
if (domainOpt.isPresent()) {
// 云服务商一般都支持桶映射到域名,这里不再特殊处理,仅处理链接的协议头即可
return BucketUrlUtil.getDomainUrl(useHttps, domainOpt.get());
}
// 否则使用站点
String endpoint = config.endpoint()
.filter(s -> !s.isBlank())
.orElseThrow(() -> S3StorageException.of("endpoint is not configured."));
// 如果未配置桶,则抛异常
String bucket = config.bucket()
.filter(s -> !s.isBlank())
.orElseThrow(() -> S3StorageException.of("bucket is not configured."));
return BucketUrlUtil.getSiteStyleBucketUrl(useHttps, endpoint, bucket);
}
}

View File

@@ -1,86 +0,0 @@
package org.dromara.common.oss.s3.builder;
import org.dromara.common.oss.s3.client.S3StorageClient;
import org.dromara.common.oss.s3.client.S3StorageClientImpl;
import org.dromara.common.oss.s3.config.S3StorageClientConfig;
import org.dromara.common.oss.s3.exception.S3StorageException;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import java.net.URI;
import java.time.Duration;
/**
* 默认S3存储客户端构建器
*
* @author 秋辞未寒
*/
public enum DefaultS3StorageClientBuilder implements S3StorageClientBuilder<S3StorageClientConfig> {
INSTANCE;
@Override
public S3StorageClient build(S3StorageClientConfig config) {
String accessKey = config.accessKey()
.filter(bucket -> !bucket.isBlank())
.orElseThrow(() -> S3StorageException.of("accessKey is not configured."));
String secretKey = config.secretKey()
.filter(bucket -> !bucket.isBlank())
.orElseThrow(() -> S3StorageException.of("secretKey is not configured."));
Region region = config.region()
.orElse(Region.US_EAST_1);
String endpointUrl = config.getEndpointUrl();
String domainUrl = config.getDomainUrl();
// MinIO 使用 HTTPS 限制使用域名访问,站点填域名。需要启用路径样式访问
boolean usePathStyleAccess = config.usePathStyleAccess();
try {
// 创建 AWS 认证信息
StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
// 创建AWS基于 Netty 的 S3 客户端
S3AsyncClient client = S3AsyncClient.builder()
.credentialsProvider(credentialsProvider)
.endpointOverride(URI.create(endpointUrl))
.region(region)
.forcePathStyle(usePathStyleAccess)
.httpClient(
NettyNioAsyncHttpClient.builder()
.connectionTimeout(Duration.ofSeconds(60))
.connectionAcquisitionTimeout(Duration.ofSeconds(30))
.maxConcurrency(100)
.maxPendingConnectionAcquires(1000)
.build()
)
.build();
//AWS基于 CRT 的 S3 AsyncClient 实例用作 S3 传输管理器的底层客户端
S3TransferManager transferManager = S3TransferManager.builder().s3Client(client).build();
// 创建 预签名 URL 的生成器 实例,用于生成 S3 预签名 URL
S3Presigner presigner = S3Presigner.builder()
.region(region)
.credentialsProvider(credentialsProvider)
.endpointOverride(URI.create(domainUrl))
.serviceConfiguration(
// 创建 S3 配置对象
S3Configuration.builder()
.chunkedEncodingEnabled(false)
.pathStyleAccessEnabled(usePathStyleAccess)
.build()
)
.build();
return new S3StorageClientImpl(config,client,transferManager,presigner);
} catch (Exception e) {
if (e instanceof S3StorageException) {
throw e;
}
throw S3StorageException.of(e);
}
}
}

View File

@@ -1,39 +0,0 @@
package org.dromara.common.oss.s3.builder;
import org.dromara.common.oss.s3.config.S3StorageClientConfig;
import org.dromara.common.oss.s3.exception.S3StorageException;
import org.dromara.common.oss.s3.util.BucketUrlUtil;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Optional;
/**
* MinIO文件对象桶URL构建器
*
* @author 秋辞未寒
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public enum MinioBucketUrlBuilder implements StrBuilder<S3StorageClientConfig> {
INSTANCE;
@Override
public String build(S3StorageClientConfig config) {
boolean useHttps = config.useHttps();
// 如果未配置桶,则抛异常
String bucket = config.bucket()
.filter(s -> !s.isBlank())
.orElseThrow(() -> S3StorageException.of("bucket is not configured."));
Optional<String> domainOpt = config.domain().filter(s -> !s.isBlank());
// 如果已经配置了自定义域名,则优先使用域名
if (domainOpt.isPresent()) {
return BucketUrlUtil.getPathStyleBucketUrl(useHttps, domainOpt.get(), bucket);
}
// 否则使用站点
String endpoint = config.endpoint()
.filter(s -> !s.isBlank())
.orElseThrow(() -> S3StorageException.of("endpoint is not configured."));
return BucketUrlUtil.getPathStyleBucketUrl(useHttps, endpoint, bucket);
}
}

View File

@@ -1,13 +0,0 @@
package org.dromara.common.oss.s3.builder;
import org.dromara.common.oss.s3.client.S3StorageClient;
/**
* S3存储客户端构建器
*
* @param <T> 参数类型
* @author 秋辞未寒
*/
public interface S3StorageClientBuilder<T> extends Builder<T,S3StorageClient> {
}

View File

@@ -1,13 +0,0 @@
package org.dromara.common.oss.s3.builder;
import org.dromara.common.oss.s3.config.S3StorageClientConfig;
/**
* S3存储客户端配置构建器
*
* @param <T> 参数类型
* @author 秋辞未寒
*/
public interface S3StorageClientConfigBuilder<T> extends Builder<T, S3StorageClientConfig> {
}

View File

@@ -1,11 +0,0 @@
package org.dromara.common.oss.s3.builder;
/**
* 字符串构建器
*
* @param <T> 参数类型
* @author 秋辞未寒
*/
public interface StrBuilder<T> extends Builder<T,String> {
}

View File

@@ -31,71 +31,117 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* S3 存储客户端实现类
* 抽象S3存储客户端实现类
*
* @author 秋辞未寒
*/
public class S3StorageClientImpl implements S3StorageClient {
public abstract class AbstractS3StorageClientImpl implements S3StorageClient {
private final AtomicBoolean initialized = new AtomicBoolean(false);
/**
* S3 存储客户端配置
*/
private final S3StorageClientConfig config;
protected S3StorageClientConfig config;
/**
* Amazon S3 异步客户端
*/
private final S3AsyncClient s3AsyncClient;
protected S3AsyncClient s3AsyncClient;
/**
* 用于管理 S3 数据传输的高级工具
*/
private final S3TransferManager s3TransferManager;
protected S3TransferManager s3TransferManager;
/**
* AWS S3 预签名 URL 生成器
*/
private final S3Presigner s3Presigner;
protected S3Presigner s3Presigner;
/**
* 异步调度线程池
*/
private final ExecutorService executorService;
protected ExecutorService asyncExecutor;
public S3StorageClientImpl(S3StorageClientConfig config, S3AsyncClient s3AsyncClient, S3TransferManager s3TransferManager, S3Presigner s3Presigner) {
this(config,s3AsyncClient,s3TransferManager,s3Presigner, Executors.newSingleThreadExecutor());
public AbstractS3StorageClientImpl(S3StorageClientConfig config) {
this.config = config;
this.initialize();
}
public S3StorageClientImpl(S3StorageClientConfig config, S3AsyncClient s3AsyncClient, S3TransferManager s3TransferManager, S3Presigner s3Presigner, ExecutorService executorService) {
this.config = config;
this.s3AsyncClient = s3AsyncClient;
this.s3TransferManager = s3TransferManager;
this.s3Presigner = s3Presigner;
this.executorService = executorService;
@Override
public void initialize() {
// 如果已经是初始化状态则直接返回
if (initialized.get()) {
return;
}
try {
doInitialize();
// 将状态转为已初始化
initialized.compareAndSet(false, true);
} catch (Exception e) {
if (e instanceof S3StorageException) {
throw e;
}
throw S3StorageException.form(e);
}
}
abstract void doInitialize();
@Override
public void refresh(S3StorageClientConfig config) {
if (Objects.equals(this.config, config)) {
return;
}
// 如果状态本来就是未初始化直接则调用初始化
if (!initialized.get()) {
this.initialize();
}
// 将状态转为未初始化
if (initialized.compareAndSet(false, true)) {
try {
this.close();
} catch (Exception e) {
// 异常不影响刷新逻辑此处屏蔽异常
}
// 状态交换成功才进行刷新
this.initialize();
}
}
@Override
public boolean verifyConfig(Function<S3StorageClientConfig, Boolean> verifyConfigAction) {
return Boolean.TRUE.equals(verifyConfigAction.apply(config.copy()));
}
@Override
public boolean verifyConfig(S3StorageClientConfig verifyConfig) {
return verifyConfig(config -> Objects.equals(config, verifyConfig));
}
@Override
public <T> T doCustomUpload(AsyncRequestBody body, Consumer<PutObjectRequest.Builder> putObjectRequestBuilderConsumer, Collection<TransferListener> transferListeners, BiFunction<CompletedUpload, Throwable, T> handleAsyncAction) {
try {
return s3TransferManager.upload(uploadRequestBuilder -> {
uploadRequestBuilder.requestBody(body)
.putObjectRequest(putObjectRequestBuilderConsumer)
.transferListeners(transferListeners);
})
.completionFuture()
.handleAsync(handleAsyncAction)
.join();
uploadRequestBuilder.requestBody(body)
.putObjectRequest(putObjectRequestBuilderConsumer)
.transferListeners(transferListeners);
})
.completionFuture()
.handleAsync(handleAsyncAction)
.join();
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.of(e);
throw S3StorageException.form(e);
}
}
@@ -134,7 +180,7 @@ public class S3StorageClientImpl implements S3StorageClient {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.of(e);
throw S3StorageException.form(e);
}
}
@@ -150,15 +196,15 @@ public class S3StorageClientImpl implements S3StorageClient {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.of(e);
throw S3StorageException.form(e);
}
}
@Override
public PutObjectResult bucketUpload(String bucket, String key, InputStream in, long contentLength) {
AsyncRequestBody body = AsyncRequestBody.fromInputStream(builder -> builder.inputStream(in)
.contentLength(contentLength)
.executor(executorService));
.contentLength(contentLength)
.executor(asyncExecutor));
return bucketUpload(bucket, key, body);
}
@@ -170,7 +216,7 @@ public class S3StorageClientImpl implements S3StorageClient {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.of(e);
throw S3StorageException.form(e);
}
}
@@ -178,33 +224,33 @@ public class S3StorageClientImpl implements S3StorageClient {
private PutObjectResult bucketUpload(String bucket, String key, AsyncRequestBody body) {
HandleAsyncResult<PutObjectResponse> result = doCustomUpload(body, builder -> builder.bucket(bucket).key(key));
if (result.isFailure()) {
throw S3StorageException.of(result.error());
throw S3StorageException.form(result.error());
}
Optional<PutObjectResponse> opt = result.getResult();
if (opt.isEmpty()) {
throw S3StorageException.of("response is empty.");
throw S3StorageException.form("response is empty.");
}
PutObjectResponse response = opt.get();
return PutObjectResult.of(null, key, response.eTag(), response.size());
return PutObjectResult.form(null, key, response.eTag(), response.size());
}
@Override
public <T> T doCustomDownload(Consumer<GetObjectRequest.Builder> getObjectRequestBuilderConsumer, AsyncResponseTransformer<GetObjectResponse, T> responseTransformer, Collection<TransferListener> transferListeners) {
try {
DownloadRequest<T> downloadRequest = DownloadRequest.builder()
.responseTransformer(responseTransformer)
.getObjectRequest(getObjectRequestBuilderConsumer)
.transferListeners(transferListeners)
.build();
.responseTransformer(responseTransformer)
.getObjectRequest(getObjectRequestBuilderConsumer)
.transferListeners(transferListeners)
.build();
return s3TransferManager.download(downloadRequest)
.completionFuture()
.join()
.result();
.completionFuture()
.join()
.result();
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.of(e);
throw S3StorageException.form(e);
}
}
@@ -219,7 +265,7 @@ public class S3StorageClientImpl implements S3StorageClient {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.of(e);
throw S3StorageException.form(e);
}
}
@@ -231,7 +277,7 @@ public class S3StorageClientImpl implements S3StorageClient {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.of(e);
throw S3StorageException.form(e);
}
}
@@ -243,7 +289,7 @@ public class S3StorageClientImpl implements S3StorageClient {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.of(e);
throw S3StorageException.form(e);
}
}
@@ -263,17 +309,17 @@ public class S3StorageClientImpl implements S3StorageClient {
}
private GetObjectResult buildGetObjectResult(String key, GetObjectResponse response) {
return GetObjectResult.of(
key,
response.eTag(),
LocalDateTime.from(response.lastModified()),
response.contentLength(),
response.contentType(),
response.contentDisposition(),
response.contentRange(),
response.contentEncoding(),
response.contentLanguage(),
response.metadata()
return GetObjectResult.form(
key,
response.eTag(),
LocalDateTime.from(response.lastModified()),
response.contentLength(),
response.contentType(),
response.contentDisposition(),
response.contentRange(),
response.contentEncoding(),
response.contentLanguage(),
response.metadata()
);
}
@@ -283,7 +329,7 @@ public class S3StorageClientImpl implements S3StorageClient {
DeleteObjectResponse response = s3AsyncClient.deleteObject(builder -> builder.bucket(bucket).key(key)).join();
return Boolean.TRUE.equals(response.deleteMarker());
} catch (Exception e) {
throw S3StorageException.of(e);
throw S3StorageException.form(e);
}
}
@@ -291,13 +337,13 @@ public class S3StorageClientImpl implements S3StorageClient {
public String bucketPresignGetUrl(String bucket, String key, Duration expiredTime) {
try {
return s3Presigner.presignGetObject(getObjectPresignRequestBuilder -> {
getObjectPresignRequestBuilder.signatureDuration(expiredTime)
.getObjectRequest(getObjectRequestBuilder -> getObjectRequestBuilder.bucket(bucket).key(key));
})
.url()
.toExternalForm();
getObjectPresignRequestBuilder.signatureDuration(expiredTime)
.getObjectRequest(getObjectRequestBuilder -> getObjectRequestBuilder.bucket(bucket).key(key));
})
.url()
.toExternalForm();
} catch (Exception e) {
throw S3StorageException.of(e);
throw S3StorageException.form(e);
}
}
@@ -305,13 +351,13 @@ public class S3StorageClientImpl implements S3StorageClient {
public String bucketPresignPutUrl(String bucket, String key, Duration expiredTime, Map<String, String> metadata) {
try {
return s3Presigner.presignPutObject(putObjectPresignRequestBuilder -> {
putObjectPresignRequestBuilder.signatureDuration(expiredTime)
.putObjectRequest(putObjectRequestBuilder -> putObjectRequestBuilder.bucket(bucket).key(key).metadata(metadata));
})
.url()
.toExternalForm();
putObjectPresignRequestBuilder.signatureDuration(expiredTime)
.putObjectRequest(putObjectRequestBuilder -> putObjectRequestBuilder.bucket(bucket).key(key).metadata(metadata));
})
.url()
.toExternalForm();
} catch (Exception e) {
throw S3StorageException.of(e);
throw S3StorageException.form(e);
}
}
@@ -392,26 +438,23 @@ public class S3StorageClientImpl implements S3StorageClient {
private String defaultBucket() {
return config.bucket()
.filter(bucket -> !bucket.isBlank())
.orElseThrow(() -> S3StorageException.of("bucket is not configured."));
}
@Override
public boolean verifyConfig(Function<S3StorageClientConfig,Boolean> verifyConfigAction) {
S3StorageClientConfig copy = S3StorageClientConfig.copy(config);
return Boolean.TRUE.equals(verifyConfigAction.apply(copy));
}
@Override
public boolean verifyConfig(S3StorageClientConfig verifyConfig) {
return verifyConfig(config -> Objects.equals(config,verifyConfig));
.filter(bucket -> !bucket.isBlank())
.orElseThrow(() -> S3StorageException.form("bucket is not configured."));
}
@Override
public void close() throws Exception {
s3TransferManager.close();
s3AsyncClient.close();
s3Presigner.close();
executorService.close();
if (s3TransferManager != null) {
s3TransferManager.close();
}
if (s3AsyncClient != null) {
s3AsyncClient.close();
}
if (s3Presigner != null) {
s3Presigner.close();
}
if (asyncExecutor != null) {
asyncExecutor.close();
}
}
}

View File

@@ -0,0 +1,90 @@
package org.dromara.common.oss.s3.client;
import org.dromara.common.oss.s3.config.S3AsyncExecutorConfig;
import org.dromara.common.oss.s3.config.S3StorageClientConfig;
import org.dromara.common.oss.s3.exception.S3StorageException;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.Executors;
/**
* 默认S3存储客户端实现类。
*
* @author 秋辞未寒
*/
public class DefaultS3StorageClientImpl extends AbstractS3StorageClientImpl {
public DefaultS3StorageClientImpl(S3StorageClientConfig config) {
super(config);
}
@Override
void doInitialize() {
// 校验配置
String accessKey = config.accessKey()
.filter(bucket -> !bucket.isBlank())
.orElseThrow(() -> S3StorageException.form("accessKey is not configured."));
String secretKey = config.secretKey()
.filter(bucket -> !bucket.isBlank())
.orElseThrow(() -> S3StorageException.form("secretKey is not configured."));
String endpointUrl = config.getEndpointUrl();
String domainUrl = config.getDomainUrl();
Region region = config.region().orElse(Region.US_EAST_1);
// MinIO 使用 HTTPS 限制使用域名访问,站点填域名。需要启用路径样式访问
boolean usePathStyleAccess = config.usePathStyleAccess();
// 创建 AWS 认证信息
StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
// 创建AWS基于 Netty 的 S3 客户端
this.s3AsyncClient = S3AsyncClient.builder()
.credentialsProvider(credentialsProvider)
.endpointOverride(URI.create(endpointUrl))
.region(region)
.forcePathStyle(usePathStyleAccess)
.httpClient(
NettyNioAsyncHttpClient.builder()
.connectionTimeout(Duration.ofSeconds(60))
.connectionAcquisitionTimeout(Duration.ofSeconds(30))
.maxConcurrency(100)
.maxPendingConnectionAcquires(1000)
.build()
)
.build();
//AWS基于 CRT 的 S3 AsyncClient 实例用作 S3 传输管理器的底层客户端
this.s3TransferManager = S3TransferManager.builder().s3Client(this.s3AsyncClient).build();
// 创建 预签名 URL 的生成器 实例,用于生成 S3 预签名 URL
this.s3Presigner = S3Presigner.builder()
.region(region)
.credentialsProvider(credentialsProvider)
.endpointOverride(URI.create(domainUrl))
.serviceConfiguration(
// 创建 S3 配置对象
S3Configuration.builder()
.chunkedEncodingEnabled(false)
.pathStyleAccessEnabled(usePathStyleAccess)
.build()
)
.build();
// 创建异步调度器对象
S3AsyncExecutorConfig asyncExecutorConfig = config.asyncExecutorConfig();
// 是否使用虚拟线程
if (asyncExecutorConfig.enabledVirtualThread()) {
this.asyncExecutor = Executors.newVirtualThreadPerTaskExecutor();
} else {
this.asyncExecutor = Executors.newScheduledThreadPool(asyncExecutorConfig.corePoolSize());
}
}
}

View File

@@ -40,6 +40,38 @@ import java.util.function.Function;
*/
public interface S3StorageClient extends AutoCloseable {
/**
* 初始化客户端
*/
void initialize();
/**
* 刷新客户端配置
*
* @param config 配置项
*/
void refresh(S3StorageClientConfig config);
/**
* 校验客户端配置
*
* <p>注意:该方法不会修改任何既有的配置和状态,你看可以理解为这仅仅是一个配置展示的方法,以供调用者根据当前的配置,自行决定是否需要重新构建客户端。</p>
*
* @param verifyConfigAction 校验配置动作函数
* @return 是否一致
*/
boolean verifyConfig(Function<S3StorageClientConfig,Boolean> verifyConfigAction);
/**
* 校验客户端配置与传入的待校验配置是否一致
*
* <p>注意:该方法不会修改任何既有的配置和状态,你看可以理解为这仅仅是一个配置展示的方法,以供调用者根据当前的配置,自行决定是否需要重新构建客户端。</p>
*
* @param verifyConfig 待校验的配置
* @return 是否一致
*/
boolean verifyConfig(S3StorageClientConfig verifyConfig);
/**
* 执行自定义上传请求。
*
@@ -381,24 +413,4 @@ public interface S3StorageClient extends AutoCloseable {
* @return 预签名上传 URL
*/
String presignPutUrl(String key, Duration expiredTime, Map<String, String> metadata);
/**
* 校验客户端配置
*
* <p>注意:该方法不会修改任何既有的配置和状态,你看可以理解为这仅仅是一个配置展示的方法,以供调用者根据当前的配置,自行决定是否需要重新构建客户端。</p>
*
* @param verifyConfigAction 校验配置动作函数
* @return 是否一致
*/
boolean verifyConfig(Function<S3StorageClientConfig,Boolean> verifyConfigAction);
/**
* 校验客户端配置与传入的待校验配置是否一致
*
* <p>注意:该方法不会修改任何既有的配置和状态,你看可以理解为这仅仅是一个配置展示的方法,以供调用者根据当前的配置,自行决定是否需要重新构建客户端。</p>
*
* @param verifyConfig 待校验的配置
* @return 是否一致
*/
boolean verifyConfig(S3StorageClientConfig verifyConfig);
}

View File

@@ -0,0 +1,25 @@
package org.dromara.common.oss.s3.config;
/**
* 配置对象接口
*
* @param <T> 配置类型
* @param <B> 配置构建器类型
*
* @author 秋辞未寒
*/
public interface Config<T,B> {
/**
* 配置对象拷贝
* @return 拷贝后的新配置对象
*/
T copy();
/**
* 转为构建器对象
* @return 构建器对象
*/
B toBuilder();
}

View File

@@ -0,0 +1,57 @@
package org.dromara.common.oss.s3.config;
import lombok.*;
import org.dromara.common.oss.s3.enums.AccessPolicy;
import org.jspecify.annotations.NonNull;
import java.io.Serial;
import java.io.Serializable;
import java.util.Optional;
/**
* S3 ACL访问策略配置
*
* @author 秋辞未寒
*/
@Data
@Builder
@EqualsAndHashCode
public class S3AccessControlPolicyConfig implements Config<S3AccessControlPolicyConfig,S3AccessControlPolicyConfig.S3AccessControlPolicyConfigBuilder>,Serializable {
@Serial
private static final long serialVersionUID = 1L;
public static final S3AccessControlPolicyConfig DEFAULT = S3AccessControlPolicyConfig.builder().build();
/**
* 是否启用ACL
*/
private boolean enabled;
/**
* 访问策略
*/
private AccessPolicy accessPolicy;
public boolean enabled() {
return enabled;
}
public @NonNull AccessPolicy accessPolicy() {
return Optional.ofNullable(accessPolicy)
.orElse(AccessPolicy.PRIVATE);
}
@Override
public S3AccessControlPolicyConfig copy() {
return toBuilder().build();
}
@Override
public S3AccessControlPolicyConfigBuilder toBuilder() {
return builder()
.enabled(enabled)
.accessPolicy(accessPolicy);
}
}

View File

@@ -1,58 +0,0 @@
package org.dromara.common.oss.s3.config;
import org.dromara.common.oss.s3.enums.AccessPolicy;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import java.io.Serial;
import java.io.Serializable;
import java.util.Optional;
/**
* ACL访问策略配置
*
* @author 秋辞未寒
*/
@RequiredArgsConstructor
@Builder
@EqualsAndHashCode
public class S3AclConfig implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 是否启用ACL
*/
private final boolean enabled;
/**
* 访问策略
*/
private final AccessPolicy accessPolicy;
public boolean enabled() {
return enabled;
}
public Optional<AccessPolicy> accessPolicy() {
return Optional.ofNullable(accessPolicy);
}
/**
* 复制ACL访问策略配置对象
*/
public static S3AclConfig copy(S3AclConfig config) {
return toBuilder(config).build();
}
/**
* 转为ACL访问策略配置构建器对象
*/
public static S3AclConfigBuilder toBuilder(S3AclConfig config) {
return builder()
.enabled(config.enabled())
.accessPolicy(config.accessPolicy().orElse(null));
}
}

View File

@@ -0,0 +1,67 @@
package org.dromara.common.oss.s3.config;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import java.io.Serial;
import java.io.Serializable;
/**
* S3 异步执行器配置
*
* @author 秋辞未寒
*/
@RequiredArgsConstructor
@Builder
@EqualsAndHashCode
public class S3AsyncExecutorConfig implements Config<S3AsyncExecutorConfig,S3AsyncExecutorConfig.S3AsyncExecutorConfigBuilder>,Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 默认核心线程数 = 当前处理器核心数
*/
public static final int DEFAULT_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();
public static final S3AsyncExecutorConfig DEFAULT = S3AsyncExecutorConfig.builder().build();
/**
* 是否启用虚拟线程
*/
private boolean enabledVirtualThread = false;
/**
* 核心线程数
*
* 默认为当前CPU核心数该配置项在配置了虚拟线程后会失效
*/
private int corePoolSize = DEFAULT_CORE_POOL_SIZE;
/**
* 是否启用虚拟线程
*/
public boolean enabledVirtualThread() {
return enabledVirtualThread;
}
/**
* 核心线程数
*/
public int corePoolSize() {
return corePoolSize;
}
@Override
public S3AsyncExecutorConfig copy() {
return toBuilder().build();
}
@Override
public S3AsyncExecutorConfigBuilder toBuilder() {
return builder()
.enabledVirtualThread(enabledVirtualThread)
.corePoolSize(corePoolSize);
}
}

View File

@@ -1,12 +1,12 @@
package org.dromara.common.oss.s3.config;
import org.dromara.common.oss.s3.builder.CloudServiceBucketUrlBuilder;
import org.dromara.common.oss.s3.builder.MinioBucketUrlBuilder;
import org.dromara.common.oss.s3.exception.S3StorageException;
import org.dromara.common.oss.s3.util.BucketUrlUtil;
import cn.hutool.http.HttpUtil;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import org.dromara.common.oss.s3.exception.S3StorageException;
import org.dromara.common.oss.s3.util.BucketUrlUtil;
import org.jspecify.annotations.NonNull;
import software.amazon.awssdk.regions.Region;
import java.io.Serial;
@@ -21,7 +21,7 @@ import java.util.Optional;
@RequiredArgsConstructor
@Builder
@EqualsAndHashCode
public class S3StorageClientConfig implements Serializable {
public class S3StorageClientConfig implements Config<S3StorageClientConfig, S3StorageClientConfig.S3StorageClientConfigBuilder>, Serializable {
@Serial
private static final long serialVersionUID = 1L;
@@ -74,7 +74,12 @@ public class S3StorageClientConfig implements Serializable {
/**
* ACL访问策略配置
*/
private final S3AclConfig aclConfig;
private final S3AccessControlPolicyConfig accessControlPolicyConfig;
/**
* 异步调度池配置
*/
private final S3AsyncExecutorConfig asyncExecutorConfig;
/**
* 访问端点
@@ -89,6 +94,7 @@ public class S3StorageClientConfig implements Serializable {
public Optional<String> domain() {
return Optional.ofNullable(domain);
}
/**
* 是否使用HTTPS协议
*/
@@ -141,8 +147,17 @@ public class S3StorageClientConfig implements Serializable {
/**
* ACL访问策略配置
*/
public Optional<S3AclConfig> aclConfig() {
return Optional.ofNullable(aclConfig);
public @NonNull S3AccessControlPolicyConfig accessControlPolicyConfig() {
return Optional.ofNullable(accessControlPolicyConfig)
.orElse(S3AccessControlPolicyConfig.DEFAULT);
}
/**
* ACL访问策略配置
*/
public @NonNull S3AsyncExecutorConfig asyncExecutorConfig() {
return Optional.ofNullable(asyncExecutorConfig)
.orElse(S3AsyncExecutorConfig.DEFAULT);
}
/**
@@ -150,11 +165,11 @@ public class S3StorageClientConfig implements Serializable {
*
* @return 访问站点URL地址
*/
public String getEndpointUrl(){
public String getEndpointUrl() {
String endpoint = endpoint()
.filter(s -> !s.isBlank())
.orElseThrow(() -> S3StorageException.of("endpoint is not configured."));
return BucketUrlUtil.getDomainUrl(useHttps, endpoint);
.filter(s -> !s.isBlank())
.orElseThrow(() -> S3StorageException.form("endpoint is not configured."));
return BucketUrlUtil.rebuildUrlHeader(useHttps, endpoint);
}
/**
@@ -162,52 +177,63 @@ public class S3StorageClientConfig implements Serializable {
*
* @return 域名URL地址
*/
public String getDomainUrl(){
public String getDomainUrl() {
return domain()
.filter(s -> !s.isBlank())
// 如果已经配置了自定义域名,则优先使用域名
.map(s -> BucketUrlUtil.getDomainUrl(useHttps, s))
// 否则使用站点
.orElseGet(this::getEndpointUrl);
// 如果已经配置了自定义域名,则优先使用域名
// 检查携带协议头
.filter(s -> HttpUtil.isHttp(s) || HttpUtil.isHttps(s))
// 否则使用站点
.orElseGet(this::getEndpointUrl);
}
/**
* 获取桶URL地址
*
* @param isCloudService 是否是云服务商
* @return 桶URL地址
*/
public String getBucketUrl(boolean isCloudService){
// 如果是云服务商,则优先使用云服务商的
if (isCloudService) {
return CloudServiceBucketUrlBuilder.INSTANCE.build(this);
}
// 否则默认使用MinIO的
return MinioBucketUrlBuilder.INSTANCE.build(this);
public String getBucketUrl() {
// 如果未配置桶,则抛异常
String bucket = bucket()
.filter(s -> !s.isBlank())
.orElseThrow(() -> S3StorageException.form("bucket is not configured."));
// 如果已经配置了自定义域名,则优先使用域名
String url = domain()
// 检查携带协议头
.filter(s -> HttpUtil.isHttp(s) || HttpUtil.isHttps(s))
// 否则使用站点
.orElseGet(() ->
endpoint()
.filter(s -> !s.isBlank())
.orElseThrow(() -> S3StorageException.form("endpoint is not configured."))
);
// 根据是否使用路径风格配置项决定存储桶的URL风格
return usePathStyleAccess ? BucketUrlUtil.getPathStyleBucketUrl(useHttps, url, bucket) : BucketUrlUtil.getSiteStyleBucketUrl(useHttps, url, bucket);
}
/**
* 复制S3存储客户端配置对象
*/
public static S3StorageClientConfig copy(S3StorageClientConfig config) {
return toBuilder(config).build();
@Override
public S3StorageClientConfig copy() {
return toBuilder().build();
}
/**
* 转为S3存储客户端配置构建器对象
*/
public static S3StorageClientConfigBuilder toBuilder(S3StorageClientConfig config) {
S3StorageClientConfigBuilder builder = builder()
.endpoint(config.endpoint().orElse(null))
.domain(config.domain().orElse(null))
.useHttps(config.useHttps())
.usePathStyleAccess(config.usePathStyleAccess())
.accessKey(config.accessKey().orElse(null))
.secretKey(config.secretKey().orElse(null))
.bucket(config.bucket().orElse(null))
.region(config.region().orElse(null))
.prefix(config.prefix().orElse(null));
config.aclConfig().ifPresent(s3AclConfig -> builder.aclConfig(S3AclConfig.copy(s3AclConfig)));
return builder;
@Override
public S3StorageClientConfigBuilder toBuilder() {
return builder()
.endpoint(endpoint)
.domain(domain)
.useHttps(useHttps)
.usePathStyleAccess(usePathStyleAccess)
.accessKey(accessKey)
.secretKey(secretKey)
.bucket(bucket)
.region(region)
.prefix(prefix)
.accessControlPolicyConfig(accessControlPolicyConfig().copy())
.asyncExecutorConfig(asyncExecutorConfig().copy());
}
}

View File

@@ -31,7 +31,7 @@ public record GetObjectResult(
Map<String, String> metadata
) {
public static GetObjectResult of(String key, String eTag, LocalDateTime lastModified, long size
public static GetObjectResult form(String key, String eTag, LocalDateTime lastModified, long size
, String contentType, String contentDisposition, String contentRange, String contentEncoding, String contentLanguage
, Map<String, String> metadata) {
return new GetObjectResult(key, eTag, lastModified, size, contentType, contentDisposition, contentRange, contentEncoding, contentLanguage, metadata);

View File

@@ -76,19 +76,4 @@ public class Options {
public static Options builder() {
return new Options();
}
/**
* 复制一个新的可选项对象
*
* @param options 可选项对象
* @return 新的可选项对象
*/
public static Options copy(Options options) {
return builder()
.setLength(options.getLength())
.setMd5Digest(options.getMd5Digest())
.setContentType(options.getContentType())
.setMetadata(options.getMetadata())
.setTransferListeners(options.getTransferListeners());
}
}

View File

@@ -16,7 +16,7 @@ public record PutObjectResult(
long size
) {
public static PutObjectResult of(String url, String key, String eTag, long size) {
public static PutObjectResult form(String url, String key, String eTag, long size) {
return new PutObjectResult(url, key, eTag, size);
}

View File

@@ -28,19 +28,19 @@ public class S3StorageException extends RuntimeException {
super(message, cause, enableSuppression, writableStackTrace);
}
public static S3StorageException of(String message) {
public static S3StorageException form(String message) {
return new S3StorageException(message);
}
public static S3StorageException of(String message, Throwable cause) {
public static S3StorageException form(String message, Throwable cause) {
return new S3StorageException(message, cause);
}
public static S3StorageException of(Throwable cause) {
public static S3StorageException form(Throwable cause) {
return new S3StorageException(cause);
}
public static S3StorageException of(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
public static S3StorageException form(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
return new S3StorageException(message, cause, enableSuppression, writableStackTrace);
}

View File

@@ -5,12 +5,12 @@ import org.dromara.common.core.constant.CacheNames;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.oss.constant.OssConstant;
import org.dromara.common.redis.utils.CacheUtils;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.oss.s3.builder.DefaultS3StorageClientBuilder;
import org.dromara.common.oss.s3.client.DefaultS3StorageClientImpl;
import org.dromara.common.oss.s3.client.S3StorageClient;
import org.dromara.common.oss.s3.config.S3StorageClientConfig;
import org.dromara.common.oss.s3.exception.S3StorageException;
import org.dromara.common.redis.utils.CacheUtils;
import org.dromara.common.redis.utils.RedisUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -32,7 +32,7 @@ public class S3StorageClientFactory {
// 获取redis 默认类型
String configKey = RedisUtils.getCacheObject(OssConstant.DEFAULT_CONFIG_KEY);
if (StringUtils.isEmpty(configKey)) {
throw S3StorageException.of("文件存储服务类型无法找到!");
throw S3StorageException.form("文件存储服务类型无法找到!");
}
return instance(configKey);
}
@@ -51,10 +51,10 @@ public class S3StorageClientFactory {
private static S3StorageClient instanceCache(String configKey) {
String json = CacheUtils.get(CacheNames.SYS_OSS_CONFIG, configKey);
if (json == null) {
throw S3StorageException.of("系统异常, '" + configKey + "'配置信息不存在!");
throw S3StorageException.form("系统异常, '" + configKey + "'配置信息不存在!");
}
S3StorageClientConfig config = JsonUtils.parseObject(json, S3StorageClientConfig.class);
return DefaultS3StorageClientBuilder.INSTANCE.build(config);
return new DefaultS3StorageClientImpl(config);
}
/**

View File

@@ -40,7 +40,7 @@ public class OutputStreamDownloadSubscriber implements Consumer<ByteBuffer>, Aut
channel.write(byteBuffer);
}
} catch (IOException e) {
throw S3StorageException.of(e);
throw S3StorageException.form(e);
}
}

View File

@@ -2,6 +2,7 @@ package org.dromara.common.oss.s3.util;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.dromara.common.core.utils.StringUtils;
/**
* 桶链接工具类
@@ -25,13 +26,13 @@ public class BucketUrlUtil {
private static final String SITE_STYLE_HTTPS_FORMATE = "https://%s.%s";
/**
* 获取域名地址 例https://s3examples.com
* 重建链接协议头将IP、域名、站点的协议头改成HTTP或者HTTPS
*
* @param isHttps 是否为HTTP
* @param base 基础地址可以是IP、站点或者域名
* @param isHttps 是否为HTTP
* @param base 基础地址可以是IP、站点或者域名
* @return 域名地址
*/
public static String getDomainUrl(boolean isHttps, String base) {
public static String rebuildUrlHeader(boolean isHttps, String base) {
String baseUrl = removeHttpProtocolHeader(base);
if (isHttps) {
return HTTPS_PROTOCOL_HEADER + baseUrl;
@@ -78,11 +79,10 @@ public class BucketUrlUtil {
* @return 移除HTTP/HTTPS协议头后的地址
*/
public static String removeHttpProtocolHeader(String url) {
String s = url.toLowerCase();
if (s.startsWith(HTTP_PROTOCOL_HEADER) || s.startsWith(HTTPS_PROTOCOL_HEADER)) {
return s.replace(HTTP_PROTOCOL_HEADER, EMPTY_STRING)
.replace(HTTPS_PROTOCOL_HEADER, EMPTY_STRING);
if (StringUtils.startsWithIgnoreCase(url, HTTP_PROTOCOL_HEADER) || StringUtils.startsWithIgnoreCase(url, HTTPS_PROTOCOL_HEADER)) {
return url.replace(HTTP_PROTOCOL_HEADER, EMPTY_STRING)
.replace(HTTPS_PROTOCOL_HEADER, EMPTY_STRING);
}
return s;
return url;
}
}