AWS S3 协议 - 简易实现
在上一篇文章中,我们通过抓包的方式直观地学习了 S3 协议。本文将在此基础上更进一步,使用 Go 语言和 Gin 框架,从零开始编写一个兼容 S3 协议的、基于本地文件系统的简单对象存储服务,以加深对协议细节的理解。
本文将使用本地文件系统作为存储后端,来实现存储桶(Bucket)和对象(Object)的核心操作,这与我们在上一篇文章中用来测试的 Minio 的早期工作方式有相似之处。
技术选型
- Go 语言: 一种由 Google 开发的静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。
- Gin 框架: 一个用 Go 语言编写的高性能 HTTP Web 框架。它提供了简洁的 API,足以快速构建我们的 S3 服务路由。
- 本地文件系统: 为了简化实现,我们直接使用本地磁盘目录来模拟 S3 的存储桶和对象结构。一个目录就是一个存储桶,目录里的文件和子目录就是对象。
项目设置与数据结构
在开始编码之前,我们首先需要定义服务的配置以及 S3 API 响应所必需的 XML 数据结构。
配置文件与加载
我们创建一个 config.yaml
文件来管理服务的端口、数据存储目录以及 S3 的访问密钥。
config.yaml
server:
port: 9000
storage:
directory: D:\S3-Storage
s3:
accessKey: myaccesskey
secretKey: mysecretkey
server:
port: 9000
storage:
directory: D:\S3-Storage
s3:
accessKey: myaccesskey
secretKey: mysecretkey
为了在 Go 程序中加载和使用这些配置,我们定义相应的结构体,并编写一个函数来解析这个 YAML 文件。
代码:配置加载
// Config 结构体用于映射 config.yaml 文件的内容
type Config struct {
Server ServerConfig `yaml:"server"`
Storage StorageConfig `yaml:"storage"`
S3 S3Config `yaml:"s3"`
}
type ServerConfig struct {
Port int `yaml:"port"`
}
type StorageConfig struct {
Directory string `yaml:"directory"`
}
type S3Config struct {
AccessKey string `yaml:"accessKey"`
SecretKey string `yaml:"secretKey"`
}
// loadConfig 函数负责读取并解析 YAML 配置文件
func loadConfig(filename string) (*Config, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("error reading config file: %v", err)
}
var config Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("error parsing config file: %v", err)
}
return &config, nil
}
// Config 结构体用于映射 config.yaml 文件的内容
type Config struct {
Server ServerConfig `yaml:"server"`
Storage StorageConfig `yaml:"storage"`
S3 S3Config `yaml:"s3"`
}
type ServerConfig struct {
Port int `yaml:"port"`
}
type StorageConfig struct {
Directory string `yaml:"directory"`
}
type S3Config struct {
AccessKey string `yaml:"accessKey"`
SecretKey string `yaml:"secretKey"`
}
// loadConfig 函数负责读取并解析 YAML 配置文件
func loadConfig(filename string) (*Config, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("error reading config file: %v", err)
}
var config Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("error parsing config file: %v", err)
}
return &config, nil
}
S3 响应的 XML 结构
S3 协议的 API 响应大多采用 XML 格式。我们需要预先定义好与这些 XML 格式对应的 Go 结构体,以便在处理请求后能方便地生成正确的响应。
代码:XML 结构体
// ListAllMyBucketsResult 对应 "列出所有存储桶" 操作的 XML 响应
type ListAllMyBucketsResult struct {
XMLName xml.Name `xml:"ListAllMyBucketsResult"`
Owner Owner `xml:"Owner"`
Buckets Buckets `xml:"Buckets"`
}
type Owner struct {
ID string `xml:"ID"`
DisplayName string `xml:"DisplayName"`
}
type Buckets struct {
Bucket []Bucket `xml:"Bucket"`
}
type Bucket struct {
Name string `xml:"Name"`
CreationDate time.Time `xml:"CreationDate"`
}
// ListBucketResult 对应 "列出存储桶内对象" 操作的 XML 响应
type ListBucketResult struct {
XMLName xml.Name `xml:"ListBucketResult"`
Name string `xml:"Name"`
Prefix string `xml:"Prefix"`
Marker string `xml:"Marker"`
Delimiter string `xml:"Delimiter"`
Contents []Contents `xml:"Contents"`
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes"`
}
type Contents struct {
Key string `xml:"Key"`
LastModified time.Time `xml:"LastModified"`
Size int64 `xml:"Size"`
StorageClass string `xml:"StorageClass"`
}
// CommonPrefixes 用于表示 S3 中的 "文件夹"
type CommonPrefixes struct {
Prefix string `xml:"Prefix"`
}
// ListAllMyBucketsResult 对应 "列出所有存储桶" 操作的 XML 响应
type ListAllMyBucketsResult struct {
XMLName xml.Name `xml:"ListAllMyBucketsResult"`
Owner Owner `xml:"Owner"`
Buckets Buckets `xml:"Buckets"`
}
type Owner struct {
ID string `xml:"ID"`
DisplayName string `xml:"DisplayName"`
}
type Buckets struct {
Bucket []Bucket `xml:"Bucket"`
}
type Bucket struct {
Name string `xml:"Name"`
CreationDate time.Time `xml:"CreationDate"`
}
// ListBucketResult 对应 "列出存储桶内对象" 操作的 XML 响应
type ListBucketResult struct {
XMLName xml.Name `xml:"ListBucketResult"`
Name string `xml:"Name"`
Prefix string `xml:"Prefix"`
Marker string `xml:"Marker"`
Delimiter string `xml:"Delimiter"`
Contents []Contents `xml:"Contents"`
CommonPrefixes []CommonPrefixes `xml:"CommonPrefixes"`
}
type Contents struct {
Key string `xml:"Key"`
LastModified time.Time `xml:"LastModified"`
Size int64 `xml:"Size"`
StorageClass string `xml:"StorageClass"`
}
// CommonPrefixes 用于表示 S3 中的 "文件夹"
type CommonPrefixes struct {
Prefix string `xml:"Prefix"`
}
S3 签名认证 (Authentication)
S3 协议的安全性基石是其签名认证机制。任何发送到 S3 服务的请求(匿名公开访问除外)都必须包含一个Authorization
请求头。服务端通过这个请求头中的信息来验证请求者的身份以及请求在传输过程中是否被篡改。
其核心在于,客户端使用自己的SecretKey
对请求的特定部分(如 HTTP 方法、URI、请求头等)进行 HMAC-SHA256 哈希运算,生成一个签名。服务端接收到请求后,用同样的方式和存储在服务端的SecretKey
生成签名,并与客户端传来的签名进行比对。如果一致,则认证通过。
我们将这个逻辑封装在 Gin 的一个中间件中,以便对所有需要保护的 API 路由生效。
代码:认证中间件 authMiddleware
// authMiddleware 是一个 Gin 中间件,用于处理 S3 的签名认证
func authMiddleware(config *Config) gin.HandlerFunc {
return func(c *gin.Context) {
// 从请求头中获取 S3 客户端发送的认证信息
auth := c.GetHeader("Authorization")
if auth == "" {
c.XML(http.StatusUnauthorized, gin.H{"error": "Authorization required"})
c.Abort()
return
}
// 解析 Authorization 请求头,其格式通常为 "AWS AccessKey:Signature"
parts := strings.Fields(auth)
if len(parts) != 2 || !strings.HasPrefix(parts[0], "AWS") {
c.XML(http.StatusUnauthorized, gin.H{"error": "Invalid Authorization header format"})
c.Abort()
return
}
creds := strings.Split(parts[1], ":")
if len(creds) != 2 {
c.XML(http.StatusUnauthorized, gin.H{"error": "Invalid credentials format"})
c.Abort()
return
}
accessKey := creds[0]
signature := creds[1]
// 验证 AccessKey 是否为服务端配置的合法用户
if accessKey != config.S3.AccessKey {
c.XML(http.StatusUnauthorized, gin.H{"error": "Invalid access key"})
c.Abort()
return
}
// 构建服务端用于计算签名的 "待签名字符串"
stringToSign := c.Request.Method + "\n" +
c.Request.URL.Path + "\n" +
c.Request.URL.RawQuery
// 使用服务端的 SecretKey 通过 HMAC-SHA256 算法计算签名
mac := hmac.New(sha256.New, []byte(config.S3.SecretKey))
mac.Write([]byte(stringToSign))
expectedSignature := base64.StdEncoding.EncodeToString(mac.Sum(nil))
// 对比服务端计算的签名和客户端发送的签名
if signature != expectedSignature {
c.XML(http.StatusUnauthorized, gin.H{"error": "Invalid signature"})
c.Abort()
return
}
// 签名验证通过,继续处理请求
c.Next()
}
}
// authMiddleware 是一个 Gin 中间件,用于处理 S3 的签名认证
func authMiddleware(config *Config) gin.HandlerFunc {
return func(c *gin.Context) {
// 从请求头中获取 S3 客户端发送的认证信息
auth := c.GetHeader("Authorization")
if auth == "" {
c.XML(http.StatusUnauthorized, gin.H{"error": "Authorization required"})
c.Abort()
return
}
// 解析 Authorization 请求头,其格式通常为 "AWS AccessKey:Signature"
parts := strings.Fields(auth)
if len(parts) != 2 || !strings.HasPrefix(parts[0], "AWS") {
c.XML(http.StatusUnauthorized, gin.H{"error": "Invalid Authorization header format"})
c.Abort()
return
}
creds := strings.Split(parts[1], ":")
if len(creds) != 2 {
c.XML(http.StatusUnauthorized, gin.H{"error": "Invalid credentials format"})
c.Abort()
return
}
accessKey := creds[0]
signature := creds[1]
// 验证 AccessKey 是否为服务端配置的合法用户
if accessKey != config.S3.AccessKey {
c.XML(http.StatusUnauthorized, gin.H{"error": "Invalid access key"})
c.Abort()
return
}
// 构建服务端用于计算签名的 "待签名字符串"
stringToSign := c.Request.Method + "\n" +
c.Request.URL.Path + "\n" +
c.Request.URL.RawQuery
// 使用服务端的 SecretKey 通过 HMAC-SHA256 算法计算签名
mac := hmac.New(sha256.New, []byte(config.S3.SecretKey))
mac.Write([]byte(stringToSign))
expectedSignature := base64.StdEncoding.EncodeToString(mac.Sum(nil))
// 对比服务端计算的签名和客户端发送的签名
if signature != expectedSignature {
c.XML(http.StatusUnauthorized, gin.H{"error": "Invalid signature"})
c.Abort()
return
}
// 签名验证通过,继续处理请求
c.Next()
}
}
存储桶 (Bucket) 操作实现
存储桶在我们的实现中直接对应文件系统中的目录。我们将在main
函数中定义处理这些操作的路由。
列出所有存储桶 (GET /
)
此操作通过读取配置中指定的根存储目录下的所有子目录来实现。
代码:main.go
中添加路由
// 在 main 函数中
r.GET("/", func(c *gin.Context) {
// 读取根存储目录下的所有子目录
dirs, err := os.ReadDir(config.Storage.Directory)
if err != nil {
// 如果根目录不存在,则创建它并返回空列表
if os.IsNotExist(err) {
_ = os.MkdirAll(config.Storage.Directory, os.ModePerm)
dirs = []os.DirEntry{}
} else {
c.XML(http.StatusInternalServerError, gin.H{"error": "Failed to list buckets"})
return
}
}
// 填充 S3 标准的 XML 响应结构
result := ListAllMyBucketsResult{
Owner: Owner{ID: "123", DisplayName: "s3-fs"},
Buckets: Buckets{},
}
for _, dir := range dirs {
if dir.IsDir() {
info, _ := dir.Info()
result.Buckets.Bucket = append(result.Buckets.Bucket, Bucket{
Name: dir.Name(),
CreationDate: info.ModTime(),
})
}
}
c.XML(http.StatusOK, result)
})
// 在 main 函数中
r.GET("/", func(c *gin.Context) {
// 读取根存储目录下的所有子目录
dirs, err := os.ReadDir(config.Storage.Directory)
if err != nil {
// 如果根目录不存在,则创建它并返回空列表
if os.IsNotExist(err) {
_ = os.MkdirAll(config.Storage.Directory, os.ModePerm)
dirs = []os.DirEntry{}
} else {
c.XML(http.StatusInternalServerError, gin.H{"error": "Failed to list buckets"})
return
}
}
// 填充 S3 标准的 XML 响应结构
result := ListAllMyBucketsResult{
Owner: Owner{ID: "123", DisplayName: "s3-fs"},
Buckets: Buckets{},
}
for _, dir := range dirs {
if dir.IsDir() {
info, _ := dir.Info()
result.Buckets.Bucket = append(result.Buckets.Bucket, Bucket{
Name: dir.Name(),
CreationDate: info.ModTime(),
})
}
}
c.XML(http.StatusOK, result)
})
创建存储桶 (PUT /{bucket}
)
创建一个存储桶等同于在根存储目录下创建一个新的子目录。
代码:main.go
中添加路由
// 在 main 函数中
r.PUT("/:bucket", func(c *gin.Context) {
bucket := c.Param("bucket")
bucketPath := filepath.Join(config.Storage.Directory, bucket)
// 在文件系统中创建一个代表存储桶的目录
err := os.MkdirAll(bucketPath, os.ModePerm)
if err != nil {
c.XML(http.StatusInternalServerError, gin.H{"error": "Failed to create bucket"})
return
}
c.Status(http.StatusOK)
})
// 在 main 函数中
r.PUT("/:bucket", func(c *gin.Context) {
bucket := c.Param("bucket")
bucketPath := filepath.Join(config.Storage.Directory, bucket)
// 在文件系统中创建一个代表存储桶的目录
err := os.MkdirAll(bucketPath, os.ModePerm)
if err != nil {
c.XML(http.StatusInternalServerError, gin.H{"error": "Failed to create bucket"})
return
}
c.Status(http.StatusOK)
})
删除存储桶 (DELETE /{bucket}
)
删除存储桶会级联删除对应的目录及其所有内容。
代码:main.go
中添加路由
// 在 main 函数中
r.DELETE("/:bucket", func(c *gin.Context) {
bucket := c.Param("bucket")
bucketPath := filepath.Join(config.Storage.Directory, bucket)
// 删除目录及其所有内容
err := os.RemoveAll(bucketPath)
if err != nil {
c.XML(http.StatusInternalServerError, gin.H{"error": "Failed to delete bucket"})
return
}
c.Status(http.StatusNoContent)
})
// 在 main 函数中
r.DELETE("/:bucket", func(c *gin.Context) {
bucket := c.Param("bucket")
bucketPath := filepath.Join(config.Storage.Directory, bucket)
// 删除目录及其所有内容
err := os.RemoveAll(bucketPath)
if err != nil {
c.XML(http.StatusInternalServerError, gin.H{"error": "Failed to delete bucket"})
return
}
c.Status(http.StatusNoContent)
})
对象 (Object) 操作实现
对象在我们的实现中对应存储桶目录下的文件或子目录。
列出对象 (GET /{bucket}
)
这个 API 需要处理prefix
和delimiter
参数,以支持按层级列出对象,这是实现 S3 客户端中“文件夹”浏览功能的关键。
代码:main.go
中添加路由
// 在 main 函数中
r.GET("/:bucket", func(c *gin.Context) {
bucket := c.Param("bucket")
prefix := c.Query("prefix")
delimiter := c.Query("delimiter")
result := ListBucketResult{
Name: bucket,
Prefix: prefix,
Delimiter: delimiter,
}
dirPath := filepath.Join(config.Storage.Directory, bucket, prefix)
entries, err := os.ReadDir(dirPath)
if err != nil {
c.XML(http.StatusNotFound, gin.H{"error": "Directory not found"})
return
}
for _, entry := range entries {
name := entry.Name()
// 构造对象在 S3 中的完整 Key
fullPath := filepath.ToSlash(filepath.Join(prefix, name))
if entry.IsDir() {
// 如果是目录,则视为一个 CommonPrefix(文件夹)
result.CommonPrefixes = append(result.CommonPrefixes, CommonPrefixes{
Prefix: fullPath + "/",
})
} else {
// 如果是文件,则视为一个 Contents(对象)
info, _ := entry.Info()
result.Contents = append(result.Contents, Contents{
Key: fullPath,
LastModified: info.ModTime(),
Size: info.Size(),
StorageClass: "STANDARD",
})
}
}
c.XML(http.StatusOK, result)
})
// 在 main 函数中
r.GET("/:bucket", func(c *gin.Context) {
bucket := c.Param("bucket")
prefix := c.Query("prefix")
delimiter := c.Query("delimiter")
result := ListBucketResult{
Name: bucket,
Prefix: prefix,
Delimiter: delimiter,
}
dirPath := filepath.Join(config.Storage.Directory, bucket, prefix)
entries, err := os.ReadDir(dirPath)
if err != nil {
c.XML(http.StatusNotFound, gin.H{"error": "Directory not found"})
return
}
for _, entry := range entries {
name := entry.Name()
// 构造对象在 S3 中的完整 Key
fullPath := filepath.ToSlash(filepath.Join(prefix, name))
if entry.IsDir() {
// 如果是目录,则视为一个 CommonPrefix(文件夹)
result.CommonPrefixes = append(result.CommonPrefixes, CommonPrefixes{
Prefix: fullPath + "/",
})
} else {
// 如果是文件,则视为一个 Contents(对象)
info, _ := entry.Info()
result.Contents = append(result.Contents, Contents{
Key: fullPath,
LastModified: info.ModTime(),
Size: info.Size(),
StorageClass: "STANDARD",
})
}
}
c.XML(http.StatusOK, result)
})
上传对象 (PUT /{bucket}/*key
)
上传操作会将请求体中的数据写入到本地文件系统中。我们还需要特别处理 S3 的aws-chunked
编码格式,这在上一篇抓包文章中可以看到。
代码:main.go
中添加路由
// 在 main 函数中
r.PUT("/:bucket/*key", func(c *gin.Context) {
bucket := c.Param("bucket")
key := strings.TrimPrefix(c.Param("key"), "/")
filePath := filepath.Join(config.Storage.Directory, bucket, key)
// 确保父目录存在
os.MkdirAll(filepath.Dir(filePath), os.ModePerm)
file, err := os.Create(filePath)
if err != nil {
c.XML(http.StatusInternalServerError, gin.H{"error": "Failed to create file"})
return
}
defer file.Close()
// S3 客户端可能会使用分块流式上传
if strings.Contains(c.GetHeader("x-amz-content-sha256"), "STREAMING-AWS4-HMAC-SHA256-PAYLOAD") {
data, err := readChunkedBody(c.Request.Body)
if err != nil {
c.XML(http.StatusInternalServerError, gin.H{"error": "Failed to read chunked data"})
return
}
_, err = file.Write(data)
} else {
// 处理普通上传
_, err = io.Copy(file, c.Request.Body)
}
if err != nil {
c.XML(http.StatusInternalServerError, gin.H{"error": "Failed to write file"})
return
}
c.Status(http.StatusOK)
})
// 在 main 函数中
r.PUT("/:bucket/*key", func(c *gin.Context) {
bucket := c.Param("bucket")
key := strings.TrimPrefix(c.Param("key"), "/")
filePath := filepath.Join(config.Storage.Directory, bucket, key)
// 确保父目录存在
os.MkdirAll(filepath.Dir(filePath), os.ModePerm)
file, err := os.Create(filePath)
if err != nil {
c.XML(http.StatusInternalServerError, gin.H{"error": "Failed to create file"})
return
}
defer file.Close()
// S3 客户端可能会使用分块流式上传
if strings.Contains(c.GetHeader("x-amz-content-sha256"), "STREAMING-AWS4-HMAC-SHA256-PAYLOAD") {
data, err := readChunkedBody(c.Request.Body)
if err != nil {
c.XML(http.StatusInternalServerError, gin.H{"error": "Failed to read chunked data"})
return
}
_, err = file.Write(data)
} else {
// 处理普通上传
_, err = io.Copy(file, c.Request.Body)
}
if err != nil {
c.XML(http.StatusInternalServerError, gin.H{"error": "Failed to write file"})
return
}
c.Status(http.StatusOK)
})
代码:解析分块上传
// readChunkedBody 读取并解析 S3 的分块上传请求体
func readChunkedBody(body io.Reader) ([]byte, error) {
var data []byte
reader := bufio.NewReader(body)
for {
// 读取块大小
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF { break }
return nil, err
}
var chunkSize int
// 解析十六进制的块大小
if _, err := fmt.Sscanf(strings.Split(line, ";")[0], "%x", &chunkSize); err != nil {
continue
}
if chunkSize == 0 { break }
// 读取指定大小的数据块
chunk := make([]byte, chunkSize)
if _, err := io.ReadFull(reader, chunk); err != nil {
return nil, err
}
data = append(data, chunk...)
// 读取并丢弃块末尾的 CRLF
reader.ReadString('\n')
}
return data, nil
}
// readChunkedBody 读取并解析 S3 的分块上传请求体
func readChunkedBody(body io.Reader) ([]byte, error) {
var data []byte
reader := bufio.NewReader(body)
for {
// 读取块大小
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF { break }
return nil, err
}
var chunkSize int
// 解析十六进制的块大小
if _, err := fmt.Sscanf(strings.Split(line, ";")[0], "%x", &chunkSize); err != nil {
continue
}
if chunkSize == 0 { break }
// 读取指定大小的数据块
chunk := make([]byte, chunkSize)
if _, err := io.ReadFull(reader, chunk); err != nil {
return nil, err
}
data = append(data, chunk...)
// 读取并丢弃块末尾的 CRLF
reader.ReadString('\n')
}
return data, nil
}
下载和删除对象
下载对象就是将文件内容作为 HTTP 响应体返回,而删除对象则是删除文件系统中的对应文件。
代码:下载对象 (GET /{bucket}/*key
)
// 在 main 函数中
r.GET("/:bucket/*key", func(c *gin.Context) {
bucket := c.Param("bucket")
key := strings.TrimPrefix(c.Param("key"), "/")
filePath := filepath.Join(config.Storage.Directory, bucket, key)
// Gin/Go 标准库提供了方便的文件服务功能
http.ServeFile(c.Writer, c.Request, filePath)
})
// 在 main 函数中
r.GET("/:bucket/*key", func(c *gin.Context) {
bucket := c.Param("bucket")
key := strings.TrimPrefix(c.Param("key"), "/")
filePath := filepath.Join(config.Storage.Directory, bucket, key)
// Gin/Go 标准库提供了方便的文件服务功能
http.ServeFile(c.Writer, c.Request, filePath)
})
代码:删除对象 (DELETE /{bucket}/*key
)
// 在 main 函数中
r.DELETE("/:bucket/*key", func(c *gin.Context) {
bucket := c.Param("bucket")
key := strings.TrimPrefix(c.Param("key"), "/")
filePath := filepath.Join(config.Storage.Directory, bucket, key)
err := os.Remove(filePath)
if err != nil {
c.XML(http.StatusNotFound, gin.H{"error": "File not found"})
return
}
c.Status(http.StatusNoContent)
})
// 在 main 函数中
r.DELETE("/:bucket/*key", func(c *gin.Context) {
bucket := c.Param("bucket")
key := strings.TrimPrefix(c.Param("key"), "/")
filePath := filepath.Join(config.Storage.Directory, bucket, key)
err := os.Remove(filePath)
if err != nil {
c.XML(http.StatusNotFound, gin.H{"error": "File not found"})
return
}
c.Status(http.StatusNoContent)
})
总结
通过以上步骤,我们用 Go 语言和 Gin 框架成功地实现了一个 S3 协议的子集,完成了一个能在本地运行的、基于文件系统的简单对象存储服务。尽管这个版本简化了认证机制,并且省略了多部分上传、版本控制等高级功能,但它比较清晰地展示了 S3 协议的核心工作流程。
我们可以使用在上一篇文章中介绍的 S3 Browser 等工具进行连接和测试。只需将 S3 Browser 的端点(Endpoint)指向127.0.0.1:9000
,并填入config.yaml
中定义的AccessKey
和SecretKey
,就可以像操作真正的 S3 一样操作我们自己编写的服务了。
评论
暂无评论,来发表第一条评论吧