Skip to main content

Bulk records add into Cassandra or ScyllaDB.

Project description

Bulk Installer ScyllaDB/Cassandra my badge

Insert over 20Million records within minutes.

Special thanks 🤝 to @Reeya Patel

Install using

pip install rscylla

Usage

from rscylla.cql import Cql
# hostslist = ['127.0.0.1']
# port = 9042
# table_name = keyspace.table_name [must required]
# username = cassandra
# password  = cassandra
obj = Cql(hostslist,port,table_name,username,password)
# default chunks = 10,000
# default workers = 4
obj.insert(filename,chuncks,workers) # filename or filepath

Example:

from rscylla.cql import Cql
obj = Cql(["localhost"],9042,"cassandra","cassandra")
obj.insert("data.csv") # optional : chuncks and workers
# way 2
obj.insert("data.csv",chunks=1000,workers=2)

Result

  • Tested on 4GB RAM with i3 processor.
Rows Duration
10,000 0.03s
3,00,000 2m
2,00,00,000 66m
Note: Depends on you hardware capability!

Work flow

  1. Import packages
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider # authenticate user
import pandas as pd
import numpy as np
import threading
import time
  1. class Cql constructor accept:
    • list of hosts : list of string
    • port : integer
    • username : string
    • password : string
class Cql:
    def __init__(self,hosts:list,port:int,table_name:str,user:str,password:str) -> None:
  1. In the constructor _ _ init _ _ intialize cluster and authenticator for connect ScyllaDb server.
self.start_time = time.time() # for completion the time show
# if table name is not formated as keyspace.table_name then raise exception
if len(table_name.split("."))!=2: raise Exception("Table name must be! keyspace.table_name")
# table_name.split('.') returns [keyspace,table_name]
# keyspace
self.keyspace_name = table_name.split(".")[0]
# table nme
self.table_name = table_name.split(".")[1]
# set authenticator with username & password
auth = PlainTextAuthProvider(username=user,password=password)
# set cluster, hosts,port and auth
self.cluster = Cluster(contact_points=hosts,port=port,auth_provider=auth)
# set keyspace
self.session = self.cluster.connect(self.keyspace_name)
  1. insert method accept:
    • For this method chunks and workers are optional
# chunks default = 10000
# workers default = 4
def insert(self,file_name:str,chuncks=10000,workers=4)->None:
  1. Get inputed table's all columns and data types to avoid data type conflicts.
    • get table's columns & data types from system_schema keyspace.
table_schema = self.session.execute("SELECT column_name,type FROM system_schema.columns WHERE keyspace_name='%s' AND table_name='%s'"%(self.keyspace_name,self.table_name))     
  1. Dictinary which contains cql data types & with python data types.
    • cql data type as key & python data type as value
self.cql_to_python_type = { 
            'NULL': None,
            'boolean': bool,
            'float': float,
            'double': float,
            'int':int,
            'smallint':int, 
            'tinyint':int,
            'counter':int,
            'varint':int,
            'bigint': int,
            'decimal': float,
            'ascii':str,
            'varchar':str,
            'text': str,
            'blob': str,
            'date': str,
            'timestamp': str,
            'time': str,
            'list': list,
            'set': set,
            'map': dict,
            'timeuuid,': None,
            'uuid': None 
        }
  1. Create self.cql_types dictionary for pandans Data Frame. Store all columns names from table_schema (exectracted from schema_keyspace [inputed table's cols])
self.cql_types = {}
self.cql_columns=[]
# create dictonary for data frame: column_name:python_datatype
for row in table_schema: # iterate extracted column_names & data types
    # row contains column_name & type
    # create key as column_name & value as python data type get from cql_to_python_type dictionary
    self.cql_types[row.column_name] = self.cql_to_python_type[row.type]
    # add column name into list
    self.cql_columns.append(row.column_name)

Example:

    # if student table contnains two rows:
    1. id : bigint
    2. name : text

    # which converted as
    cql_types = {
        "id":int,
        "name":str
    }
  1. Create commana seprated string for insert statement
param_keys = ",".join(self.cql_columns) 
param_values = ",".join([ '%('+k+')s' for k in self.cql_columns])
self.insert__statement = "INSERT INTO "+self.table_name+" ("+param_keys+") VALUES ("+param_values+")"

Example:

if cql_columns = ['id','name']
then
param_keys = 'id,name'
# for bind value as dictionary (json object)
param_values = '%(id)s,%(name)s'
# param_values direct bind py execute method.
  1. Read file based on their extension
    • pandas read_csv method accept data frame & chunksize which slice huge file into chunks.
    • supported files are: .csv, .json, .xls, .xlsx
# supported files
file_ext = file_name.split(".")[-1] # get extension
if file_ext=="csv": # if csv then read csv with chuncks
    data_frames = pd.read_csv(file_name,chunksize=chuncks)
elif file_ext=="json":
    data_frames = pd.read_json(file_name,chunksize=chuncks,lines=True)
elif file_ext in ["xls","xlsx"]:
    data_frames = pd.read_excel(file_name,chunksize=chuncks)
else:
    raise Exception("%s is not supported!"%(file_name))

Example:

data__frames has Pandas TextFileReader object 
  1. Create empty thread list
threads = list(range(workers))
# e.g workers = 4
# threads = [0,1,2,3]
  1. Start while loop for read data_frames:
  • df = next(data_frames) fetch data frame from TextFileReader, Every loop fetch new frame
  • First for loop assigin thread object into threads[i] position & start thread
  • In the thread class we pass execute_insert method and parameters as args = (dataframe,)
  • Second for loop wait for threads[i] complete the execution.
while True:
    try:
        # start thread
        for i in range(workers):
            # fetch next dataframe
            df = next(data_frames)
            threads[i] = threading.Thread(target=self.execute_insert,args=(df,))
            threads[i].start()

        # wait until complete task
        for i in range(workers):
            threads[i].join()
    # on execption break
    except Exception as e:
        print(e)
        break
  1. Created method execute_insert.
  • Accept parameter DataFrame
def execute_insert(self,df:pd.DataFrame)->None:
  1. In the method:
  • Make lower case column names. (cql have all lower case columns names)
  • handle null type for numeric values:
    • df.select_dtypes(include=np.number).columns get column name which have data type float,int
    • df.select_dtypes(include=np.number).fillna(0) select columns & fill 0 if null present
    • df.select_dtypes(exclude=np.number).fillna('') select non numeric cols and fill empty string.
  • Iterate df (dataframe) with session.execute_async method with insert statement & row(dict|json object).
    • Method session.execute_async return future object so result() is required for completed execute.
    • Add session.execute_async into futures list
  • Second loop iterate each future object & call result() method. By using this loop wait for all session.execute_async method to complete execution.

NOTE: In dataframe Pandas.NaN set by default where null value is located and cql raise exception so we have to fill 0 & empty string

# make lower columns name for cql
df.columns = map(str.lower,df.columns)
# handle null for numeric type default 0 add
df[df.select_dtypes(include=np.number).columns] = df.select_dtypes(include=np.number).fillna(0) 
# handle null for object and string types
df[df.select_dtypes(exclude=np.number).columns] = df.select_dtypes(exclude=np.number).fillna('') 
# empty list
futures = []
# excute async & store future object
for row in df.to_dict(orient='records'): # iterate each records
   futures.append(self.session.execute_async(self.insert__statement,row))
# wait for complete async 
for future in futures:
   future.result() # wait until insert queries

Authors :

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rscylladb-1.0.5.tar.gz (10.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rscylladb-1.0.5-py3-none-any.whl (9.6 kB view details)

Uploaded Python 3

File details

Details for the file rscylladb-1.0.5.tar.gz.

File metadata

  • Download URL: rscylladb-1.0.5.tar.gz
  • Upload date:
  • Size: 10.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.8.10

File hashes

Hashes for rscylladb-1.0.5.tar.gz
Algorithm Hash digest
SHA256 1b1d77ae5d65cc1dee886ff4f86d991cb41146bbbb4718b50569cbe36e78cec5
MD5 86aad227df7737228d6bf5614cd1066f
BLAKE2b-256 1f84feab73d623ed19cf38a6dd567a4458e655a7e54c39999a3c2f5ea1dbf787

See more details on using hashes here.

File details

Details for the file rscylladb-1.0.5-py3-none-any.whl.

File metadata

  • Download URL: rscylladb-1.0.5-py3-none-any.whl
  • Upload date:
  • Size: 9.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.8.10

File hashes

Hashes for rscylladb-1.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 3357559773d486f423166b1f90a05dbc4053de4bb325302f287d25fb4e6e5a97
MD5 b0979039877f2bc7b0535ac429d2ab55
BLAKE2b-256 be8bccc2cc6d2f45350be6e56aae187f1c814f2f8f91c0cce4f3166258bf2ca7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page