Golang MongoDB扩展包之高级...

胡大大 2020-08-03 16:11:06 1742 0 comments

前言

上篇文章主要讲解了MongoDB的CRUD操作,本篇文章讲下MongoDB的Aggregate、transaction、索引等特性。

Aggregate

聚合操作处理数据记录并返回计算结果. 聚合操作将来自多个文档的值分组在一起,并且可以对分组的数据执行各种操作以返回单个结果. MongoDB提供了三种执行聚合的方法: 聚合管道map-reduce函数和单一目的聚合方法,下面代码基于聚合管道。

数据库数据如下:
image.png
假设我们要统计状态为1的每个用户的分数,代码如下

type CountResult struct { UID int64 `json:"uid" bson:"_id"` Total float32 `json:"Total" bson:"total"` } func aggregate(ctx context.Context, db *mongo.Database) { matchStage := bson.D{{"$match", bson.M{"status": 1}}} groupStage := bson.D{{ "$group", bson.D{ {"_id", "$uid"}, {"total", bson.D{{ "$sum", "$score"}}}, }}} cursor, err := db.Collection(userCollection).Aggregate(ctx, mongo.Pipeline{matchStage, groupStage}) if err != nil { log.Fatalln(err) } res := []CountResult{} if err = cursor.All(ctx, &res); err != nil { log.Fatalln(err) } log.Println(res) }

结果如下:
image.png
当然,Aggregate也有第三个options参数,重要方法如下:

  • SetAllowDiskUse
    如果为真,该操作可以写入服务器上数据库目录路径的_tmp子目录中的临时文件。默认值为false。每个阶段管道限制100M的内存,若单节点管道超出极限,MongoDB产生错误。为了能够处理大型数据集,可设置 SetAllowDiskUse 为 true 为聚合管道节点把数据写入临时文件,以解决100M内存限制。
  • SetHint
    用于聚合的索引。这应该是作为字符串的索引名,或者作为文档的索引规范。该提示不适用于lookup和graphLookup聚合阶段。默认值为nil,这意味着不会发送任何提示。
  • SetMaxTime
    查询可以在服务器上运行的最大时间。默认值为nil,这意味着查询执行没有时间限制。

其它参数请参考:AggregateOptions

事务

type Author struct { ID primitive.ObjectID `json:"id" bson:"_id,omitempty"` Name string `json:"name"` } type Article struct { ID primitive.ObjectID `json:"id" bson:"_id,omitempty"` Title string `json:"title"` AuthorID primitive.ObjectID `json:"author_id" bson:"author_id"` } func transaction(ctx context.Context, db *mongo.Database) { authorCollection := "author" articleCollection := "article" err := db.Client().UseSession(ctx, func(sessionContext mongo.SessionContext) error { if err := sessionContext.StartTransaction(); err != nil { return err } authorID := primitive.NewObjectID() author := Author{ ID: authorID, Name: "aa", } if _, err := db.Collection(authorCollection).InsertOne(sessionContext, author); err != nil { _ = sessionContext.AbortTransaction(sessionContext) return err } article := Article{ ID: primitive.NewObjectID(), Title: "test", AuthorID: authorID, } if _, err := db.Collection(articleCollection).InsertOne(sessionContext, article); err != nil { _ = sessionContext.AbortTransaction(sessionContext) return err } return sessionContext.CommitTransaction(sessionContext) }) if err != nil { log.Fatalln(err) } }

如果你直接运行上面的代码,多半会出现下面的错误:
image.png
出现这个错误的原因是MongoDB不支持在单实例上执行事务,只能在副本集或分片上执行,关于此错的解释可以参考这里:transaction-error,也可以参考官网的文档:transactions
怎么解决呢?由于我使用的是docker,所以我的解决方法是把MongoDB实例改成副本集,其他方式请参考这里:convert-standalone-to-replica-set
只需要把根目录下的docker-compose的注释行打开就行
image.png
然后修改MongoDB的连接方式,增加副本集设置,名字跟docker设置的副本集名字一致为rs1。

func getClient() *mongo.Client { clientOption := options.Client() clientOption.ApplyURI("mongodb://test:test@127.0.0.1:27017") clientOption.SetReplicaSet("rs1") client, err := mongo.NewClient(clientOption) if err != nil { log.Fatal(err) } ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) err = client.Connect(ctx) if err != nil { log.Fatal(err) } err = client.Ping(ctx, readpref.Primary()) if err != nil { log.Fatal(err) } return client }

这还没有完,参考官网的转换教程,还要进入容器初始化副本。
image.png
进入容器后,执行使用mongo命令连接MongoDB,进入MongoShell交互终端。然后执行命令rs.initiate()初始化副本,结果如下表示成功了。
image.png
然后可以执行命令rs.status()来查看副本的信息。在members数组中你能看到已经有一个副本了。
image.png
退出容器,执行上面的事务代码,你会发现又报错了,错误如下:
image.png
出现这个错误的原因是我们设置的连接超时为10秒钟,修改超时时间为30秒再试下,

ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)

修改后再次执行代码,又报错了
image.png
出现这个错的原因可以参考这里:error,error,一句话概括就是宿主机不能通过docker容器名直接连接容器,只有和容器在同一个网络才能连接。解决办法有两个:
1.把应用程序也构建成容器,和MongoDB处于同一docker网络下。2.修改MongoDB中的地址。在这里我选择第二种方式,简单粗暴一点。
首先进入容器,连接MongoDB进入mongoshell交互终端。

  1. 执行命令 cfg = rs.conf()
  2. 然后执行命令cfg.members[0].host = "192.168.170.97:27017",注意,请把192.168.170.97换成你电脑的ip地址。
  3. 最后执行rs.reconfig(cfg)

再次执行rs.status()你会看到name已经改变了
image.png
退出容器,再次执行代码,你会发现又报错了。
image.png
是不是有种崩溃的感觉,哈哈,别着急,快结束了,这就是代码的乐趣。
为什么报这个错,因为MongoDB不支持在未存在的集合执行事务,要执行事务,该集合必须已经存在在数据库。所以,简单插入两条数据,确保集合存在。

authorID := primitive.NewObjectID() author := Author{ ID: authorID, Name: "aa", } if _, err := db.Collection(authorCollection).InsertOne(ctx, author); err != nil { log.Fatalln(err) } article := Article{ ID: primitive.NewObjectID(), Title: "test", AuthorID: authorID, } if _, err := db.Collection(articleCollection).InsertOne(ctx, article); err != nil { log.Fatalln(err) }

再次执行事务代码,你会发现,数据插入成功了。所以通过本次一步一步调试,让大家了解了MongoDB的事务的一些坑。总结如下:

  1. MongoDB只能在分片或副本集上执行事务。
  2. MongoDB只能在已经存在的集合执行事务。
  3. 使用docker时必须和MongoDB处于同一docker网络下。

除此之外,MongoDB事务也有一些限制,具体请参考:Multi-document-Transactions

索引

func indexExample(ctx context.Context, db *mongo.Database) { // 创建一个有过期时间的索引 indexOption := options.Index() indexOption.SetName("uid_index") indexOption.SetExpireAfterSeconds(600) mod := mongo.IndexModel{ Keys: bson.M{"uid": 1}, Options: indexOption, } _, err := db.Collection(userCollection).Indexes().CreateOne(ctx, mod) if err != nil { log.Fatal(err) } // 创建一个文本索引 indexOption = options.Index() indexOption.SetName("name_index") mod = mongo.IndexModel{ Keys: bson.M{"name": "text"}, Options: indexOption, } _, err = db.Collection(userCollection).Indexes().CreateOne(ctx,mod) if err != nil { log.Fatal(err) } // 删除索引 _, err = db.Collection(userCollection).Indexes().DropOne(ctx,"uid_index") if err != nil { log.Fatal(err) } }

MongoDB支持的索引有很多种,我简单的写了有过期时间的索引,文本索引以及删除索引,其他索引可以参考:indexes
关于索引的设置可以参考这里:IndexOptions
尽管索引有很多好处,但是它也有一些限制,参考这里:index-limitations

今天的文章就到这吧,篇幅有点长,哈哈,简单记录下,当然这篇文章并不是最详细的,因为每一个模块都值得去深入研究。后续如果有时间在深入写点文章吧。
转载请注明出处,谢谢。



标签
评论一下

评论列表

暂时没有评论,快来评论吧..