from pymongo import MongoClient from bson import ObjectId, json_util import re # 主环境 (生产环境为production,开发环境为development) setting = 'production' # 获取数据集 def col(arg): conn = MongoClient('mongodb://ccb:wL2wG2aD6aI1@mongo:27017/ccb') if setting == 'development': arg += '_test' if arg == 'user_info': return conn.ccb.user_info elif arg == 'user_info_test': return conn.ccb.user_info_test else: return False # 查询有无用户 def searchUser(mail_addr): try: user_info = col('user_info').find_one({"mail_addr": mail_addr}) if user_info: user_info.pop("_id") return {'errcode': 200, 'user_info': user_info, 'errmsg': 'ok'} else: return {'errcode': 101, 'errmsg': '用户不存在'} except Exception as e: print(e) # id不合法 return {'errcode': 102, 'errmsg': 'user_info获取失败'} # 插入一个新的用户 def insertUser(data): try: col('user_info').insert_one(data) except Exception as e: # 插入失败 return {'errcode': 103, 'errmsg': '新用户插入失败'} return {'errcode': 200, 'errmsg': '新用户插入成功'} # 更新密码本 def updateCodebook(mail_addr, content, update_time): try: res = col('user_info').update({"mail_addr": mail_addr}, {'$set': {'codebook': content, 'update_time': update_time}}) # 判断有没有是否成功 if(res['updatedExisting']): return {'errcode': 200, 'errmsg': 'codebook覆写成功', 'update_time': update_time} else: return { 'errcode': 301, 'errmsg': 'codebook用户不存在'} except Exception as e: return {'errcode': 302, 'errmsg': 'codebook覆写失败'} # 激活用户 def ActivateUser(uuid): try: res = col('user_info').update({"uuid": uuid}, {'$set': {'active': True}}) # 判断有没有是否成功 if(res['updatedExisting']): return {'errcode': 200, 'errmsg': 'active覆写成功'} else: return { 'errcode': 401, 'errmsg': 'active用户不存在'} except Exception as e: return {'errcode': 402, 'errmsg': 'active覆写失败'} # 查询有无用户 def searchUserByUuid(uuid): try: user_info = col('user_info').find_one({"uuid": uuid}) if user_info: return {'errcode': 200, 'user_info': user_info, 'errmsg': 'ok'} else: return {'errcode': 404, 'errmsg': '用户不存在'} except Exception as e: print(e) # id不合法 return {'errcode': 405, 'errmsg': 'user_info获取失败'}