本篇博客记录下使用 AWS DynamoDB 过程中用到的一些知识和技术点,以备后需。
Amazon DynamoDB 是一个键/值和文档数据库,可以在任何规模的环境中提供个位数的毫秒级性能。它是一个完全托管的多区域多主数据库,具有适用于 Internet 规模的应用程序的内置安全性、备份和恢复和内存缓存。DynamoDB 每天可处理超过 10 万亿个请求,并可支持每秒超过 2000 万个请求的峰值。
使用 docker 容器本地部署 DynamoDB 实例
1
|
docker run -d -p 8000:8000 amazon/dynamodb-local
|
执行 aws 命令前,需要在 ~/.aws/credentials 中配置访问 DynamoDB 的 Access Key 和 Secret Key,如下所示:
1
2
3
|
[default]
aws_access_key_id = ${aws_access_key_id}
aws_secret_access_key = ${aws_access_key_id}
|
在 ~/.aws/config 中配置区域,如下所示:
1
2
|
[default]
region = ap-southeast-1
|
如果是本地启动的 DynamoDB 实例,只需任意一个合法的 DynamoDB Access Key 和 Secret Key 即可。
1、显示所有数据库表
1
|
aws dynamodb list-tables --endpoint-url http://localhost:8000
|
2、显示特定表信息
1
|
aws dynamodb describe-table --table-name xdhuxc --endpoint-url http://localhost:8000
|
3、创建表
1
2
3
4
5
6
7
8
|
aws dynamodb create-table \
--table-name xdhuxc \
--attribute-definitions \
AttributeName=key,AttributeType=S \
--key-schema \
AttributeName=key,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=10,WriteCapacityUnits=10 \
--endpoint-url http://localhost:8000
|
4、插入 JSON 格式数据
1
2
3
4
5
6
7
|
aws dynamodb put-item --table-name xdhuxc \
--item '{ "key": { "S": "/myapp/study/hosts/codelieche" }, "value": {"S": "{\"domain\": \"www.codelieche.com\", \"ip\": \"192.168.1.101\"}" }}' \
--endpoint-url http://localhost:8000
aws dynamodb put-item --table-name xdhuxc \
--item '{ "key": { "S": "/myapp/study/hosts/codelieche2" }, "value": {"S": "{\"domain\": \"www.codelieche.com\", \"ip\": \"192.168.1.101\"}"}}' \
--endpoint-url http://localhost:8000
|
5、查看 dynamodb 数据库中所有数据
aws dynamodb scan --table-name xdhuxc --endpoint-url http://localhost:8000
可以使用 AWS 提供的 golang SDK 来操作 DynamoDB,实现在我们的程序内部对 DynamoDB 的调用。
注意,执行批量操作时,有数据量限制,每次只能增删 25 条数据,多余的将会报错(BatchLimit = 25)。
1、创建 dynamodb 客户端,对 DynamoDB 的操作都要使用该客户端来进行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
func NewDynamoDBClient(region string, endpoint string, disableSSL bool) (*dynamodb.DynamoDB, error) {
if region == "" {
sess, err := session.NewSession()
if err != nil {
return nil, err
}
metadata := ec2metadata.New(sess)
ec2Region, err := metadata.Region()
if err != nil {
return nil, fmt.Errorf("aws DynamoDB requires a region")
}
region = ec2Region
}
var c *aws.Config
// 使用环境变量中的 AccessKey 和 SecretKey
accessKey := os.Getenv("AWS_ACCESS_KEY_ID")
secretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")
if accessKey != "" && secretKey != "" {
creds := credentials.NewStaticCredentials(accessKey, secretKey, "")
c = &aws.Config{
Region: aws.String(region),
Credentials: creds,
Endpoint: aws.String(endpoint),
DisableSSL: aws.Bool(disableSSL),
}
} else if accessKey != "" && secretKey != "" { // 使用配置文件中的 AccessKey 和 SecretKey
creds := credentials.NewStaticCredentials(accessKey, secretKey, "")
c = &aws.Config{
Region: aws.String(region),
Credentials: creds,
Endpoint: aws.String(endpoint),
DisableSSL: aws.Bool(disableSSL),
}
} else {
c = &aws.Config{ // 使用 ~/.aws/credentials 文件中的 AccessKey 和 SecretKey 。
Region: aws.String(region),
Endpoint: aws.String(endpoint),
DisableSSL: aws.Bool(disableSSL),
}
}
sess, err := session.NewSession(c)
if err != nil {
return nil, err
}
db := dynamodb.New(sess)
return db, nil
}
|
2、创建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func CreateTable(ddb *dynamodb.DynamoDB, tableName string) error {
itemInput := &dynamodb.CreateTableInput{
AttributeDefinitions: []*dynamodb.AttributeDefinition{
{
AttributeName: aws.String("key"),
AttributeType: aws.String("S"),
},
},
KeySchema: []*dynamodb.KeySchemaElement{
{
AttributeName: aws.String("key"),
KeyType: aws.String("HASH"),
},
},
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(10),
WriteCapacityUnits: aws.Int64(10),
},
TableName: aws.String(tableName),
}
_, err := ddb.CreateTable(itemInput)
if err != nil {
return err
}
return nil
}
|
3、向 dynamodb 表中插入键值对
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func insert2DynamoDB(ddb *dynamodb.DynamoDB, tableName string, key string, value string) error {
itemInput := &dynamodb.PutItemInput{
TableName: aws.String(tableName),
Item: map[string]*dynamodb.AttributeValue{
"key": { // 注意,建表时键名起为 key。
S: aws.String(key),
},
"value": {
S: aws.String(value),
},
},
}
if _, err := ddb.PutItem(itemInput); err != nil {
return err
}
return nil
}
|
4、向 dynamodb 表中批量插入键值数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
func batchWrite2DynamoDB(ddb *dynamodb.DynamoDB, tableName string, data map[string]string) error {
requestItems := make(map[string][]*dynamodb.WriteRequest)
length := len(data)
var writeRequests []*dynamodb.WriteRequest
count := 0
writeRequests = []*dynamodb.WriteRequest{}
for key, value := range data {// 键值数据
var writeRequest *dynamodb.WriteRequest
writeRequest = &dynamodb.WriteRequest{
PutRequest: &dynamodb.PutRequest{
Item: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(key),
},
"value": {
S: aws.String(value),
},
},
},
}
writeRequests = append(writeRequests, writeRequest)
count = count + 1
if (count % utils.BatchLimit == 0) || (count >= length) {
requestItems[tableName] = writeRequests
batchInput := &dynamodb.BatchWriteItemInput{
RequestItems: requestItems,
}
if _, err := ddb.BatchWriteItem(batchInput); err != nil {
return err
}
writeRequests = []*dynamodb.WriteRequest{}
}
}
return nil
}
|
5、批量删除数据
此处是通过批量删除的方式删除了全部数据,但是保留了表结构,之所以没有采用删除表的操作,是因为线上环境,权限所限
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
func BatchDeleteItems(ddb *dynamodb.DynamoDB, tableName string) error {
errc := make(chan error, 1)
wg := sync.WaitGroup{}
scannedData, err := ddb.Scan(&dynamodb.ScanInput{TableName: aws.String(tableName)})
if err != nil {
return err
}
length := len(scannedData.Items)
wg.Add(int(math.Ceil((float64(length)) / float64(utils.BatchLimit))))
req := []*dynamodb.WriteRequest{}
for i, a := range scannedData.Items {
req = append(req, &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{
Key: map[string]*dynamodb.AttributeValue{
"key": a["key"],
},
},
})
if (i+1) % utils.BatchLimit == 0 || i >= int(*scannedData.Count)-1 {
go func(reqChunk []*dynamodb.WriteRequest) {
defer wg.Done()
_, err := t.DdbClient.BatchWriteItem(&dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
t.TableName: reqChunk,
},
})
if err != nil {
errc <- err
}
}(req)
req = []*dynamodb.WriteRequest{}
}
}
go func() {
wg.Wait()
close(errc)
}()
return <-errc
}
|
6、删除数据表
1
2
3
4
5
6
7
8
9
10
|
func DeleteTable(ddb *dynamodb.DynamoDB, tableName string) error {
itemInput := &dynamodb.DeleteTableInput{
TableName: aws.String(tableName),
}
if _, err := ddb.DeleteTable(itemInput); err != nil {
return err
}
return nil
}
|
https://hub.docker.com/r/amazon/dynamodb-local
https://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/DynamoDBLocal.DownloadingAndRunning.html
https://docs.aws.amazon.com/sdk-for-go/api/service/dynamodb/