Bulk records add into Cassandra or ScyllaDB.
Project description
Bulk Installer ScyllaDB/Cassandra 
Insert over 20Million records within minutes.
Special thanks 🤝 to @Reeya Patel
Install using
pip install rscylla
Usage
from rscylla.cql import Cql
# hostslist = ['127.0.0.1']
# port = 9042
# table_name = keyspace.table_name [must required]
# username = cassandra
# password = cassandra
obj = Cql(hostslist,port,table_name,username,password)
# default chunks = 10,000
# default workers = 4
obj.insert(filename,chuncks,workers) # filename or filepath
Example:
from rscylla.cql import Cql
obj = Cql(["localhost"],9042,"cassandra","cassandra")
obj.insert("data.csv") # optional : chuncks and workers
# way 2
obj.insert("data.csv",chunks=1000,workers=2)
Result
- Tested on 4GB RAM with i3 processor.
| Rows | Duration |
|---|---|
| 10,000 | 0.03s |
| 3,00,000 | 2m |
| 2,00,00,000 | 66m |
| Note: Depends on you hardware capability! |
Work flow
- Import packages
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider # authenticate user
import pandas as pd
import numpy as np
import threading
import time
- class Cql constructor accept:
- list of hosts : list of string
- port : integer
- username : string
- password : string
class Cql:
def __init__(self,hosts:list,port:int,table_name:str,user:str,password:str) -> None:
- In the constructor _ _ init _ _ intialize cluster and authenticator for connect ScyllaDb server.
self.start_time = time.time() # for completion the time show
# if table name is not formated as keyspace.table_name then raise exception
if len(table_name.split("."))!=2: raise Exception("Table name must be! keyspace.table_name")
# table_name.split('.') returns [keyspace,table_name]
# keyspace
self.keyspace_name = table_name.split(".")[0]
# table nme
self.table_name = table_name.split(".")[1]
# set authenticator with username & password
auth = PlainTextAuthProvider(username=user,password=password)
# set cluster, hosts,port and auth
self.cluster = Cluster(contact_points=hosts,port=port,auth_provider=auth)
# set keyspace
self.session = self.cluster.connect(self.keyspace_name)
- insert method accept:
- For this method chunks and workers are optional
# chunks default = 10000
# workers default = 4
def insert(self,file_name:str,chuncks=10000,workers=4)->None:
- Get inputed table's all columns and data types to avoid data type conflicts.
- get table's columns & data types from system_schema keyspace.
table_schema = self.session.execute("SELECT column_name,type FROM system_schema.columns WHERE keyspace_name='%s' AND table_name='%s'"%(self.keyspace_name,self.table_name))
- Dictinary which contains cql data types & with python data types.
- cql data type as key & python data type as value
self.cql_to_python_type = {
'NULL': None,
'boolean': bool,
'float': float,
'double': float,
'int':int,
'smallint':int,
'tinyint':int,
'counter':int,
'varint':int,
'bigint': int,
'decimal': float,
'ascii':str,
'varchar':str,
'text': str,
'blob': str,
'date': str,
'timestamp': str,
'time': str,
'list': list,
'set': set,
'map': dict,
'timeuuid,': None,
'uuid': None
}
- Create self.cql_types dictionary for pandans Data Frame. Store all columns names from table_schema (exectracted from schema_keyspace [inputed table's cols])
self.cql_types = {}
self.cql_columns=[]
# create dictonary for data frame: column_name:python_datatype
for row in table_schema: # iterate extracted column_names & data types
# row contains column_name & type
# create key as column_name & value as python data type get from cql_to_python_type dictionary
self.cql_types[row.column_name] = self.cql_to_python_type[row.type]
# add column name into list
self.cql_columns.append(row.column_name)
Example:
# if student table contnains two rows:
1. id : bigint
2. name : text
# which converted as
cql_types = {
"id":int,
"name":str
}
- Create commana seprated string for insert statement
param_keys = ",".join(self.cql_columns)
param_values = ",".join([ '%('+k+')s' for k in self.cql_columns])
self.insert__statement = "INSERT INTO "+self.table_name+" ("+param_keys+") VALUES ("+param_values+")"
Example:
if cql_columns = ['id','name']
then
param_keys = 'id,name'
# for bind value as dictionary (json object)
param_values = '%(id)s,%(name)s'
# param_values direct bind py execute method.
- Read file based on their extension
- pandas read_csv method accept data frame & chunksize which slice huge file into chunks.
- supported files are: .csv, .json, .xls, .xlsx
# supported files
file_ext = file_name.split(".")[-1] # get extension
if file_ext=="csv": # if csv then read csv with chuncks
data_frames = pd.read_csv(file_name,chunksize=chuncks)
elif file_ext=="json":
data_frames = pd.read_json(file_name,chunksize=chuncks,lines=True)
elif file_ext in ["xls","xlsx"]:
data_frames = pd.read_excel(file_name,chunksize=chuncks)
else:
raise Exception("%s is not supported!"%(file_name))
Example:
data__frames has Pandas TextFileReader object
- Create empty thread list
threads = list(range(workers))
# e.g workers = 4
# threads = [0,1,2,3]
- Start while loop for read data_frames:
- df = next(data_frames) fetch data frame from TextFileReader, Every loop fetch new frame
- First for loop assigin thread object into threads[i] position & start thread
- In the thread class we pass execute_insert method and parameters as args = (dataframe,)
- Second for loop wait for threads[i] complete the execution.
while True:
try:
# start thread
for i in range(workers):
# fetch next dataframe
df = next(data_frames)
threads[i] = threading.Thread(target=self.execute_insert,args=(df,))
threads[i].start()
# wait until complete task
for i in range(workers):
threads[i].join()
# on execption break
except Exception as e:
print(e)
break
- Created method execute_insert.
- Accept parameter DataFrame
def execute_insert(self,df:pd.DataFrame)->None:
- In the method:
- Make lower case column names. (cql have all lower case columns names)
- handle null type for numeric values:
- df.select_dtypes(include=np.number).columns get column name which have data type float,int
- df.select_dtypes(include=np.number).fillna(0) select columns & fill 0 if null present
- df.select_dtypes(exclude=np.number).fillna('') select non numeric cols and fill empty string.
- Iterate df (dataframe) with session.execute_async method with insert statement & row(dict|json object).
- Method session.execute_async return future object so result() is required for completed execute.
- Add session.execute_async into futures list
- Second loop iterate each future object & call result() method. By using this loop wait for all session.execute_async method to complete execution.
NOTE: In dataframe Pandas.NaN set by default where null value is located and cql raise exception so we have to fill 0 & empty string
# make lower columns name for cql
df.columns = map(str.lower,df.columns)
# handle null for numeric type default 0 add
df[df.select_dtypes(include=np.number).columns] = df.select_dtypes(include=np.number).fillna(0)
# handle null for object and string types
df[df.select_dtypes(exclude=np.number).columns] = df.select_dtypes(exclude=np.number).fillna('')
# empty list
futures = []
# excute async & store future object
for row in df.to_dict(orient='records'): # iterate each records
futures.append(self.session.execute_async(self.insert__statement,row))
# wait for complete async
for future in futures:
future.result() # wait until insert queries
Authors :
-
🙋♀️ Reeya Patel
-
🙋♂️ Meet Rathod
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
rscylladb-1.0.5.tar.gz
(10.4 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rscylladb-1.0.5.tar.gz.
File metadata
- Download URL: rscylladb-1.0.5.tar.gz
- Upload date:
- Size: 10.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.8.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1b1d77ae5d65cc1dee886ff4f86d991cb41146bbbb4718b50569cbe36e78cec5
|
|
| MD5 |
86aad227df7737228d6bf5614cd1066f
|
|
| BLAKE2b-256 |
1f84feab73d623ed19cf38a6dd567a4458e655a7e54c39999a3c2f5ea1dbf787
|
File details
Details for the file rscylladb-1.0.5-py3-none-any.whl.
File metadata
- Download URL: rscylladb-1.0.5-py3-none-any.whl
- Upload date:
- Size: 9.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.8.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3357559773d486f423166b1f90a05dbc4053de4bb325302f287d25fb4e6e5a97
|
|
| MD5 |
b0979039877f2bc7b0535ac429d2ab55
|
|
| BLAKE2b-256 |
be8bccc2cc6d2f45350be6e56aae187f1c814f2f8f91c0cce4f3166258bf2ca7
|