-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlib_db.py
139 lines (113 loc) · 4.56 KB
/
lib_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python3.5
# -*- coding: UTF-8 -*-
try:
from pymongo import MongoClient
except ImportError:
raise ImportError('PyMongo is not installed in your machine.')
class MongoDB(object):
def __init__(self, host='localhost', port=27017,
database_name=None, collection_name=None,
drop_n_create=False):
try:
# creating connection while object creation
self._connection = MongoClient(host=host, port=port, maxPoolSize=200)
except Exception as error:
raise Exception(error)
# drop database and create new one (optional)
if drop_n_create:
self.drop_db(database_name)
self._database = None
self._collection = None
# assigning database name while object creation
if database_name:
self._database = self._connection[database_name]
# assigning collection name while object creation
if collection_name:
self._collection = self._database[collection_name]
@staticmethod
def check_state(obj):
if not obj:
return False
else:
return True
def check_db(self):
# validate the database name
if not self.check_state(self._database):
raise ValueError('Database is empty/not created.')
def check_collection(self):
# validate the collection name
if not self.check_state(self._collection):
raise ValueError('Collection is empty/not created.')
def get_overall_details(self):
# get overall connection information
client = self._connection
details = dict((db, [collection for collection in client[db].collection_names()])
for db in client.database_names())
return details
def get_current_status(self):
# get current connection information
return {
'connection': self._connection,
'database': self._database,
'collection': self._collection
}
def create_db(self, database_name=None):
# create the database name
self._database = self._connection[database_name]
def create_collection(self, collection_name=None):
# create the collection name
self.check_db()
self._collection = self._database[collection_name]
def get_database_names(self):
# get the database name you are currently connected too
return self._connection.database_names()
def get_collection_names(self):
# get the collection name you are currently connected too
self.check_collection()
return self._database.collection_names(include_system_collections=False)
def drop_db(self, database_name):
# drop/delete whole database
self._database = None
self._collection = None
return self._connection.drop_database(str(database_name))
def drop_collection(self):
# drop/delete a collection
self._collection.drop()
self._collection = None
def insert(self, post):
# add/append/new single record
self.check_collection()
post_id = self._collection.insert_one(post).inserted_id
return post_id
def insert_many(self, posts):
# add/append/new multiple records
self.check_collection()
result = self._collection.insert_many(posts)
return result.inserted_ids
def find_one(self, *args, count=False):
# search/find many matching records returns iterator object
self.check_collection()
if not count:
return self._collection.find_one(*args)
# return only count
return self._collection.find(*args).count()
def find(self, *args, count=False):
# search/find many matching records returns iterator object
self.check_collection()
if not count:
return self._collection.find(*args)
# return only count
return self._collection.find(*args).count()
def count(self):
# get the records count in collection
self.check_collection()
return self._collection.count()
def remove(self, *args):
# remove/delete records
return self._collection.remove(*args)
def update(self, *args):
# updating/modifying the records
return self._collection.update(*args)
def aggregate(self, *args):
# grouping the records
return self._collection.aggregate(*args)