- 2019-11-17
Golang MongoDB扩展包之高级...
前言
上篇文章主要讲解了MongoDB的CRUD操作,本篇文章讲下MongoDB的Aggregate、transaction、索引等特性。
Aggregate
聚合操作处理数据记录并返回计算结果. 聚合操作将来自多个文档的值分组在一起,并且可以对分组的数据执行各种操作以返回单个结果. MongoDB提供了三种执行聚合的方法: 聚合管道 , map-reduce函数和单一目的聚合方法,下面代码基于聚合管道。
数据库数据如下:
假设我们要统计状态为1的每个用户的分数,代码如下
type CountResult struct {
UID int64 `json:"uid" bson:"_id"`
Total float32 `json:"Total" bson:"total"`
}
func aggregate(ctx context.Context, db *mongo.Database) {
matchStage := bson.D{{"$match", bson.M{"status": 1}}}
groupStage := bson.D{{
"$group",
bson.D{
{"_id", "$uid"},
{"total", bson.D{{
"$sum", "$score"}}},
}}}
cursor, err := db.Collection(userCollection).Aggregate(ctx, mongo.Pipeline{matchStage, groupStage})
if err != nil {
log.Fatalln(err)
}
res := []CountResult{}
if err = cursor.All(ctx, &res); err != nil {
log.Fatalln(err)
}
log.Println(res)
}
结果如下:
当然,Aggregate也有第三个options参数,重要方法如下:
- SetAllowDiskUse
如果为真,该操作可以写入服务器上数据库目录路径的_tmp子目录中的临时文件。默认值为false。每个阶段管道限制100M的内存,若单节点管道超出极限,MongoDB产生错误。为了能够处理大型数据集,可设置 SetAllowDiskUse 为 true 为聚合管道节点把数据写入临时文件,以解决100M内存限制。 - SetHint
用于聚合的索引。这应该是作为字符串的索引名,或者作为文档的索引规范。该提示不适用于lookup和graphLookup聚合阶段。默认值为nil,这意味着不会发送任何提示。 - SetMaxTime
查询可以在服务器上运行的最大时间。默认值为nil,这意味着查询执行没有时间限制。
其它参数请参考:AggregateOptions
事务
type Author struct {
ID primitive.ObjectID `json:"id" bson:"_id,omitempty"`
Name string `json:"name"`
}
type Article struct {
ID primitive.ObjectID `json:"id" bson:"_id,omitempty"`
Title string `json:"title"`
AuthorID primitive.ObjectID `json:"author_id" bson:"author_id"`
}
func transaction(ctx context.Context, db *mongo.Database) {
authorCollection := "author"
articleCollection := "article"
err := db.Client().UseSession(ctx, func(sessionContext mongo.SessionContext) error {
if err := sessionContext.StartTransaction(); err != nil {
return err
}
authorID := primitive.NewObjectID()
author := Author{
ID: authorID,
Name: "aa",
}
if _, err := db.Collection(authorCollection).InsertOne(sessionContext, author); err != nil {
_ = sessionContext.AbortTransaction(sessionContext)
return err
}
article := Article{
ID: primitive.NewObjectID(),
Title: "test",
AuthorID: authorID,
}
if _, err := db.Collection(articleCollection).InsertOne(sessionContext, article); err != nil {
_ = sessionContext.AbortTransaction(sessionContext)
return err
}
return sessionContext.CommitTransaction(sessionContext)
})
if err != nil {
log.Fatalln(err)
}
}
如果你直接运行上面的代码,多半会出现下面的错误:
出现这个错误的原因是MongoDB不支持在单实例上执行事务,只能在副本集或分片上执行,关于此错的解释可以参考这里:transaction-error,也可以参考官网的文档:transactions
怎么解决呢?由于我使用的是docker,所以我的解决方法是把MongoDB实例改成副本集,其他方式请参考这里:convert-standalone-to-replica-set
只需要把根目录下的docker-compose的注释行打开就行
然后修改MongoDB的连接方式,增加副本集设置,名字跟docker设置的副本集名字一致为rs1。
func getClient() *mongo.Client {
clientOption := options.Client()
clientOption.ApplyURI("mongodb://test:test@127.0.0.1:27017")
clientOption.SetReplicaSet("rs1")
client, err := mongo.NewClient(clientOption)
if err != nil {
log.Fatal(err)
}
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
err = client.Connect(ctx)
if err != nil {
log.Fatal(err)
}
err = client.Ping(ctx, readpref.Primary())
if err != nil {
log.Fatal(err)
}
return client
}
这还没有完,参考官网的转换教程,还要进入容器初始化副本。
进入容器后,执行使用mongo命令连接MongoDB,进入MongoShell交互终端。然后执行命令rs.initiate()
初始化副本,结果如下表示成功了。
然后可以执行命令rs.status()
来查看副本的信息。在members数组中你能看到已经有一个副本了。
退出容器,执行上面的事务代码,你会发现又报错了,错误如下:
出现这个错误的原因是我们设置的连接超时为10秒钟,修改超时时间为30秒再试下,
ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
修改后再次执行代码,又报错了
出现这个错的原因可以参考这里:error,error,一句话概括就是宿主机不能通过docker容器名直接连接容器,只有和容器在同一个网络才能连接。解决办法有两个:
1.把应用程序也构建成容器,和MongoDB处于同一docker网络下。2.修改MongoDB中的地址。在这里我选择第二种方式,简单粗暴一点。
首先进入容器,连接MongoDB进入mongoshell交互终端。
- 执行命令
cfg = rs.conf()
- 然后执行命令
cfg.members[0].host = "192.168.170.97:27017"
,注意,请把192.168.170.97换成你电脑的ip地址。 - 最后执行
rs.reconfig(cfg)
再次执行rs.status()
你会看到name已经改变了
退出容器,再次执行代码,你会发现又报错了。
是不是有种崩溃的感觉,哈哈,别着急,快结束了,这就是代码的乐趣。
为什么报这个错,因为MongoDB不支持在未存在的集合执行事务,要执行事务,该集合必须已经存在在数据库。所以,简单插入两条数据,确保集合存在。
authorID := primitive.NewObjectID()
author := Author{
ID: authorID,
Name: "aa",
}
if _, err := db.Collection(authorCollection).InsertOne(ctx, author); err != nil {
log.Fatalln(err)
}
article := Article{
ID: primitive.NewObjectID(),
Title: "test",
AuthorID: authorID,
}
if _, err := db.Collection(articleCollection).InsertOne(ctx, article); err != nil {
log.Fatalln(err)
}
再次执行事务代码,你会发现,数据插入成功了。所以通过本次一步一步调试,让大家了解了MongoDB的事务的一些坑。总结如下:
- MongoDB只能在分片或副本集上执行事务。
- MongoDB只能在已经存在的集合执行事务。
- 使用docker时必须和MongoDB处于同一docker网络下。
除此之外,MongoDB事务也有一些限制,具体请参考:Multi-document-Transactions
索引
func indexExample(ctx context.Context, db *mongo.Database) {
// 创建一个有过期时间的索引
indexOption := options.Index()
indexOption.SetName("uid_index")
indexOption.SetExpireAfterSeconds(600)
mod := mongo.IndexModel{
Keys: bson.M{"uid": 1},
Options: indexOption,
}
_, err := db.Collection(userCollection).Indexes().CreateOne(ctx, mod)
if err != nil {
log.Fatal(err)
}
// 创建一个文本索引
indexOption = options.Index()
indexOption.SetName("name_index")
mod = mongo.IndexModel{
Keys: bson.M{"name": "text"},
Options: indexOption,
}
_, err = db.Collection(userCollection).Indexes().CreateOne(ctx,mod)
if err != nil {
log.Fatal(err)
}
// 删除索引
_, err = db.Collection(userCollection).Indexes().DropOne(ctx,"uid_index")
if err != nil {
log.Fatal(err)
}
}
MongoDB支持的索引有很多种,我简单的写了有过期时间的索引,文本索引以及删除索引,其他索引可以参考:indexes
关于索引的设置可以参考这里:IndexOptions
尽管索引有很多好处,但是它也有一些限制,参考这里:index-limitations
今天的文章就到这吧,篇幅有点长,哈哈,简单记录下,当然这篇文章并不是最详细的,因为每一个模块都值得去深入研究。后续如果有时间在深入写点文章吧。
转载请注明出处,谢谢。
- 上一篇: Golang MongoDB扩展包之CRUD
- 下一篇: Json Web Token解析
评论一下