Friday, 2 August 2019

Architecture of Cassandra

Architecture of Cassandra

A Cassandra instance is a collection of independent nodes that are configured together into a cluster. In a Cassandra cluster, all nodes are peers, meaning there is no master node or centralized management process. A node joins a Cassandra cluster based on certain aspects of its configuration. Cassandra uses a protocol called gossip to discover location and state information about the other nodes participating in a Cassandra cluster. Gossip is a peer-to-peer communication protocol in which nodes periodically exchange state information about themselves and about other nodes they know about.
In Cassandra, the gossip process runs every second and exchanges state messages with up to three other nodes in the cluster. The nodes exchange information about themselves and about the other nodes that they have gossiped about, so all nodes quickly learn about all other nodes in the cluster. A gossip message has a version associated with it, so that during a gossip exchange, older information is overwritten with the most current state for a particular node.
When a node first starts up, it looks at its configuration file to determine the name of the Cassandra cluster it belongs to and which node(s), called seeds, to contact to obtain information about the other nodes in the cluster. These cluster contact points are configured in the cassandra.yaml configuration file for a node.
To prevent partitions in gossip communications, all nodes in a cluster should have the same list of seed nodes listed in their configuration file. This is most critical the first time a node starts up. By default, a node will remember other nodes it has gossiped with between subsequent restarts.
The seed node designation has no purpose other than bootstrapping the gossip process for new nodes joining the cluster. Seed nodes are not a single point of failure, nor do they have any other special purpose in cluster operations beyond the bootstrapping of nodes.
Gossip Settings
The gossip settings control a nodes participation in a cluster and how the node is known to the cluster.
Property
Description
cluster_name
Name of the cluster that this node is joining. Should be the same for every node in the cluster
listen_address
The IP address or hostname that other Cassandra nodes will use to connect to this node. Should be changed from localhost to the public address for the host.
seeds
A comma-delimited list of node IP addresses used to bootstrap the gossip process. Every node should have the same list of seeds. In multi data center clusters, the seed list should include a node from each data center.
storage_port
The intra-node communication port (default is 7000). Should be the same for every node in the cluster.
initial_token
The initial token is used to determine the range of data this node is responsible for.
About Failure Detection and Recovery
Failure detection is a method for locally determining, from gossip state, if another node in the system is up or down. Failure detection information is also used by Cassandra to avoid routing client requests to unreachable nodes whenever possible. 
The gossip process tracks heartbeats from other nodes both directly (nodes gossiping directly to it) and indirectly (nodes heard about secondhand, third hand, and so on). Rather than have a fixed threshold for marking nodes without a heartbeat as down, Cassandra uses an accrual detection mechanism to calculate a per-node threshold that takes into account network conditions, workload, or other conditions that might affect perceived heartbeat rate. During gossip exchanges, every node maintains a sliding window of inter-arrival times of gossip messages from other nodes in the cluster. The value of phi is based on the distribution of inter-arrival time values across all nodes in the cluster. In Cassandra, configuring the phi_convict_threshold property adjusts the sensitivity of the failure detector. The default value is fine for most situations, but DataStax recommends increasing it to 12 for Amazon EC2 due to the network congestion frequently experienced on that platform.
Node failures can result from various causes such as hardware failures, network outages, and so on. Node outages are often transient but can last for extended intervals. A node outage rarely signifies a permanent departure from the cluster, and therefore does not automatically result in permanent removal of the node from the ring. 
Other nodes will still try to periodically initiate gossip contact with failed nodes to see if they are back up. To permanently change a node's membership in a cluster, administrators must explicitly add or remove nodes from a Cassandra cluster using the nodetool utility.
When a node comes back online after an outage, it may have missed writes for the replica data it maintains. Once the failure detector marks a node as down, missed writes are stored by other replicas if hinted handoff is enabled (for a period of time, anyways). However, it is possible that some writes were missed between the interval of a node actually going down and when it is detected as down. Or if a node is down for longer than max_hint_window_in_ms (one hour by default), hints will no longer be saved. For that reason, it is best practice to routinely run nodetool repair on all nodes to ensure they have consistent data, and to also run repair after recovering a node that has been down for an extended period.
 About Data Partitioning in Cassandra
When you start a Cassandra cluster, you must choose how the data will be divided across the nodes in the cluster. This is done by choosing a partitioner for the cluster.
In Cassandra, the total data managed by the cluster is represented as a circular space or ring. The ring is divided up into ranges equal to the number of nodes, with each node being responsible for one or more ranges of the overall data. Before a node can join the ring, it must be assigned a token. The token determines the node's position on the ring and the range of data it is responsible for.
Column family data is partitioned across the nodes based on the row key. To determine the node where the first replica of a row will live, the ring is walked clockwise until it locates the node with a token value greater than that of the row key. Each node is responsible for the region of the ring between itself (inclusive) and its predecessor (exclusive). With the nodes sorted in token order, the last node is considered the predecessor of the first node; hence the ring representation.
For example, consider a simple 4 node cluster where all of the row keys managed by the cluster were numbers in the range of 0 to 100. Each node is assigned a token that represents a point in this range. In this simple example, the token values are 0, 25, 50, and 75. The first node, the one with token 0, is responsible for the wrapping range (75-0). The node with the lowest token also accepts row keys less than the lowest token and more than the highest token.

About Partitioning in Multi-Data Center Clusters
In multi-data center deployments, replica placement is calculated per data center when using the NetworkTopologyStrategy replica placement strategy. In each data center (or replication group) the first replica for a particular row is determined by the token value assigned to a node. Additional replicas in the same data center are placed by walking the ring clockwise until it reaches the first node in another rack.
If you do not calculate partition-er tokens so that the data ranges are evenly distributed for each data center, you could end up with uneven data distribution within a data center.
The goal is to ensure that the nodes for each data center have token assignments that evenly divide the overall range. Otherwise, you could end up with nodes in each data center that own a disproportionate number of row keys. Each data center should be partitioned as if it were its own distinct ring, however token assignments within the entire cluster cannot conflict with each other (each node must have a unique token). See Calculating Tokens for a Multi-Data Center Cluster for strategies on how to generate tokens for multi-data center clusters.
Understanding the Partition-er Types
Unlike almost every other configuration choice in Cassandra, the partitioner may not be changed without reloading all of your data. It is important to choose and configure the correct partition-er before initializing your cluster. Cassandra offers a number of partitioners out-of-the-box, but the random partition-er is the best choice for most Cassandra deployments.
About the Random Partition-er
The Random Partition-er is the default partitioning strategy for a Cassandra cluster, and in almost all cases is the right choice.
Random partitioning uses consistent hashing to determine which node will store a particular row. Unlike naive modulus-by-node-count, consistent hashing ensures that when nodes are added to the cluster, the minimum possible set of data is affected.
To distribute the data evenly across the number of nodes, a hashing algorithm creates an MD5 hash value of the row key. The possible range of hash values is from 0 to 2**127. Each node in the cluster is assigned a token that represents a hash value within this range. A node then owns the rows with a hash value less than its token number. For single data center deployments, tokens are calculated by dividing the hash range by the number of nodes in the cluster. For multi data center deployments, tokens are calculated per data center (the hash range should be evenly divided for the nodes in each replication group).
The primary benefit of this approach is that once your tokens are set appropriately, data from all of your column families is evenly distributed across the cluster with no further effort. For example, one column family could be using user names as the row key and another column family timestamps, but the row keys from each individual column family are still spread evenly. This also means that read and write requests to the cluster will also be evenly distributed.
Another benefit of using random partitioning is the simplification of load balancing a cluster. Because each part of the hash range will receive an equal number of rows on average, it is easier to correctly assign tokens to new nodes.
About Ordered Partition-ers
Using an ordered partitioner ensures that row keys are stored in sorted order. Unless absolutely required by your application, DataStax strongly recommends choosing the random partitioner over an ordered partitioner.
Using an ordered partitioner allows range scans over rows, meaning you can scan rows as though you were moving a cursor through a traditional index. For example, if your application has user names as the row key, you can scan rows for users whose names fall between Jake and Joe. This type of query would not be possible with randomly partitioned row keys, since the keys are stored in the order of their MD5 hash (not sequentially).
Although having the ability to do range scans on rows sounds like a desirable feature of ordered partitioners, there are ways to achieve the same functionality using column family indexes. Most applications can be designed with a data model that supports ordered queries as slices over a set of columns rather than range scans over a set of rows.
Using an ordered partitioner is not recommended for the following reasons:
• Sequential writes can cause hot spots. If your application tends to write or update a sequential block of rows at a time, then the writes will not be distributed across the cluster; they will all go to one node. This is frequently a problem for applications dealing with timestamped data.
• More administrative overhead to load balance the cluster. An ordered partitioner requires administrators to manually calculate token ranges based on their estimates of the row key distribution. In practice, this requires actively moving node tokens around to accommodate the actual distribution of data once it is loaded.
• Uneven load balancing for multiple column families. If your application has multiple column families, chances are that those column families have different row keys and different distributions of data. An ordered partitioner than is balanced for one column family may cause hot spots and uneven distribution for another column family in the same cluster.
There are three choices of built-in ordered partitioners that come with Cassandra. Note that the OrderPreservingPartitioner and CollatingOrderPreservingPartitioner are deprecated as of Cassandra 0.7 in favor of the ByteOrderedPartitioner:
• ByteOrderedPartitioner - Row keys are stored in order of their raw bytes rather than converting them to encoded strings. Tokens are calculated by looking at the actual values of your row key data and using a hexadecimal representation of the leading character(s) in a key. For example, if you wanted to partition rows alphabetically, you could assign an A token using its hexadecimal representation of 41.
• OrderPreservingPartitioner - Row keys are stored in order based on the UTF-8 encoded value of the row keys. Requires row keys to be UTF-8 encoded strings.
• CollatingOrderPreservingPartitioner - Row keys are stored in order based on the United States English locale (EN_US). Also requires row keys to be UTF-8 encoded strings.
About Replication in Cassandra
Replication is the process of storing copies of data on multiple nodes to ensure reliability and fault tolerance. When you create a keyspace in Cassandra, you must decide the replica placement strategy: the number of replicas and how those replicas are distributed across nodes in the cluster. The replication strategy relies on the cluster-configured snitch to help it determine the physical location of nodes and their proximity to each other.
The total number of replicas across the cluster is often referred to as the replication factor. A replication factor of 1 means that there is only one copy of each row. A replication factor of 2 means two copies of each row. All replicas are equally important; there is no primary or master replica in terms of how read and write requests are handled.
As a general rule, the replication factor should not exceed the number of nodes in the cluster. However, it is possible to increase replication factor, and then add the desired number of nodes afterwards. When replication factor exceeds the number of nodes, writes will be rejected, but reads will be served as long as the desired consistency level can be met.
About Replica Placement Strategy
The replica placement strategy determines how replicas for a keyspace are distributed across the cluster. The replica placement strategy is set when you create a keyspace.
There are a number of strategies to choose from based on your goals and the information you have about where nodes are located.
SimpleStrategy
SimpleStrategy is the default replica placement strategy when creating a keyspace using the Cassandra CLI. Other interfaces, such as the CQL utility, require you to explicitly specify a strategy.
SimpleStrategy places the first replica on a node determined by the partitioner. Additional replicas are placed on the next nodes clockwise in the ring without considering rack or data center location.

NetworkTopologyStrategy is the preferred replication placement strategy when you have information about how nodes are grouped in your data center, or you have (or plan to have) your cluster deployed across multiple data centers.
This strategy allows you to specify how many replicas you want in each data center.
When deciding how many replicas to configure in each data center, the primary considerations are (1) being able to satisfy reads locally, without incurring cross-datacenter latency, and (2) failure scenarios.
The two most common ways to configure multiple data center clusters are:
• Two replicas in each data center. This configuration tolerates the failure of a single node per replication group, and still allows local reads at a consistency level of ONE.
• Three replicas in each data center. This configuration tolerates the failure of a one node per replication group at a strong consistency level of LOCAL_QUORUM, or tolerates multiple node failures per data center using consistency level ONE.
Asymmetrical replication groupings are also possible depending on your use case. For example, you may want to have three replicas per data center to serve real-time application requests, and then have a single replica in a separate data center designated to running analytics. In Cassandra, the term data center is synonymous with replication group - it is a grouping of nodes configured together for replication purposes. It does not have to be a physical data center.
With NetworkTopologyStrategy, replica placement is determined independently within each data center (or replication group). The first replica per data center is placed according to the partitioner (same as with SimpleStrategy). Additional replicas in the same data center are then determined by walking the ring clockwise until a node in a different rack from the previous replica is found. If there is no such node, additional replicas will be placed in the same rack. NetworkTopologyStrategy prefers to place replicas on distinct racks if possible. Nodes in the same rack (or similar physical grouping) can easily fail at the same time due to power, cooling, or network issues.
Here is an example of how NetworkTopologyStrategy would place replicas spanning two data centers with a total replication factor of 4 (two replicas in Data Center 1 and two replicas in Data Center 2):

Notice how tokens are assigned to alternating racks.
NetworkTopologyStrategy relies on a properly configured snitch to place replicas correctly across data centers and racks, so it is important to configure your cluster to use a snitch that can correctly determine the locations of nodes in your network.
Note
NetworkTopologyStrategy should be used in place of the OldNetworkTopologyStrategy, which only supports a limited configuration of exactly 3 replicas across 2 data centers, with no control over which data center gets two replicas for any given row key. Some rows will have two replicas in the first and one in the second, while others will have two in the second and one in the first.
About Snitches
The snitch is a configurable component of a Cassandra cluster used to define how the nodes are grouped together within the overall network topology (such as rack and data center groupings). Cassandra uses this information to route inter-node requests as efficiently as possible within the confines of the replica placement strategy. The snitch does not affect requests between the client application and Cassandra (it does not control which node a client connects to). Snitches are configured for a Cassandra cluster in the cassandra.yaml configuration file. All nodes in a cluster should use the same snitch configuration. When assigning tokens, assign them to alternating racks. For example: rack1, rack2, rack3, rack1, rack2, rack3, and so on.
Assign tokens to nodes in alternating racks
The following snitches are available:
SimpleSnitch
The SimpleSnitch (the default) is appropriate if you have no rack or data center information available. Single-data center deployments (or single-zone in public clouds) usually fall into this category. If using this snitch, use replication_factor=<#> when defining your keyspace strategy_options. This snitch does not recognize data center or rack information.
DseSimpleSnitch
DseSimpleSnitch is used in DataStax Enterprise (DSE) deployments only. It logically configures Hadoop analytics nodes in a separate data center from pure Cassandra nodes in order to segregate analytic and real-time workloads. It can be used for mixed-workload DSE clusters located in one physical data center. It can also be used for multi-data center DSE clusters that have exactly 2 data centers, with all analytic nodes in one data center and all Cassandra real-time nodes in another data center. If using this snitch, use Analytics or Cassandra as your data center names when defining your keyspace strategy_options.
RackInferringSnitch
RackInferringSnitch infers the topology of the network by analyzing the node IP addresses. This snitch assumes that the second octet identifies the data center where a node is located, and the third octet identifies the rack.
If using this snitch, use the second octet number of your node IPs as your data center names when defining your keyspace strategy_options. For example, 100 would be the data center name.
PropertyFileSnitch
PropertyFileSnitch determines the location of nodes by referring to a user-defined description of the network details located in the property file cassandra-topology.properties. This snitch is best when your node IPs are not uniform or you have complex replication grouping requirements. See Configuring the PropertyFileSnitch for more information.
If using this snitch, you can define your data center names to be whatever you want. Just make sure the data center names you define in the cassandra-topology.properties file correlates to what you name your data centers in your keyspace strategy_options.
EC2Snitch
EC2Snitch is for simple cluster deployments on Amazon EC2 where all nodes in the cluster are within the same region. Instead of using the node's IP address to infer node location, this snitch uses the AWS API to request the region and availability zone of a node. The region is treated as the data center and the availability zones are treated as racks within the data center. For example, if a node is in us-east-1a, us-east is the data center name and 1a is the rack location.
If using this snitch, use the EC2 region name (for example,''us-east'') as your data center name when defining your keyspace strategy_options.
EC2MultiRegionSnitch
EC2MultiRegionSnitch is for cluster deployments on Amazon EC2 where the cluster spans multiple regions. Instead of using the node's IP address to infer node location, this snitch uses the AWS API to request the region and availability zone of a node. Regions are treated as data centers and availability zones are treated as racks within a data center. For example, if a node is in us-east-1a, us-east is the data center name and 1a is the rack location.
If using this snitch, you must configure each Cassandra node so that listen_address is set to the private IP address or the node, and broadcast_address is set to the public IP address of the node. This allows Cassandra nodes in one EC2 region to bind to nodes in another region, thus enabling multi-data center support.
If using this snitch, use the EC2 region name (for example,''us-east'') as your data center names when defining your keyspace strategy_options.
About Dynamic Snitching
By default, all snitches also use a dynamic snitch layer that monitors read latency and, when possible, routes requests away from poorly-performing nodes. The dynamic snitch is enabled by default, and is recommended for use in most deployments.
Dynamic snitch thresholds can be configured in the cassandra.yaml configuration file for a node.
Client Requests in Cassandra
All nodes in Cassandra are peers. A client read or write request can go to any node in the cluster. When a client connects to a node and issues a read or write request, that node serves as the coordinator for that particular client operation. 
The job of the coordinator is to act as a proxy between the client application and the nodes (or replicas) that own the data being requested. The coordinator determines which nodes in the ring should get the request based on the cluster configured partitioner and replica placement strategy.
About Write Requests
For writes, the coordinator sends the write to all replicas that own the row being written. As long as all replica nodes are up and available, they will get the write regardless of the consistency level specified by the client. The write consistency level determines how many replica nodes must respond with a success acknowledgement in order for the write to be considered successful.
For example, in a single data center 10 node cluster with a replication factor of 3, an incoming write will go to all 3 nodes that own the requested row. If the write consistency level specified by the client is ONE, the first node to complete the write responds back to the coordinator, which then proxies the success message back to the client. A consistency level of ONE means that it is possible that 2 of the 3 replicas could miss the write if they happened to be down at the time the request was made. If a replica misses a write, the row will be made consistent later via one of Cassandra's built-in repair mechanisms: hinted handoff, read repair or anti-entropy node repair.

About Multi-Data Center Write Requests
In multi data center deployments, Cassandra optimizes write performance by choosing one coordinator node in each remote data center to handle the requests to replicas within that data center. The coordinator node contacted by the client application only needs to forward the write request to one node in each remote data center.
If using a consistency level of ONE or LOCAL_QUORUM, only the nodes in the same data center as the coordinator node must respond to the client request in order for the request to succeed. This way, geographical latency does not impact client request response times.
Read Requests
For reads, there are two types of read requests that a coordinator can send to a replica; a direct read request and a background read repair request. The number of replicas contacted by a direct read request is determined by the consistency level specified by the client. Background read repair requests are sent to any additional replicas that did not receive a direct request. Read repair requests ensure that the requested row is made consistent on all replicas.
Thus, the coordinator first contacts the replicas specified by the consistency level. The coordinator will send these requests to the replicas that are currently responding most promptly. The nodes contacted will respond with the requested data; if multiple nodes are contacted, the rows from each replica are compared in memory to see if they are consistent. If they are not, then the replica that has the most recent data (based on the timestamp) is used by the coordinator to forward the result back to the client.
To ensure that all replicas have the most recent version of frequently-read data, the coordinator also contacts and compares the data from all the remaining replicas that own the row in the background, and if they are inconsistent, issues writes to the out-of-date replicas to update the row to reflect the most recently written values. This process is known as read repair. Read repair can be configured per column family (using read_repair_chance), and is enabled by default.
For example, in a cluster with a replication factor of 3, and a read consistency level of QUORUM, 2 of the 3 replicas for the given row are contacted to fulfill the read request. Supposing the contacted replicas had different versions of the row, the replica with the most recent version would return the requested data. In the background, the third replica is checked for consistency with the first two, and if needed, the most recent replica issues a write to the out-of-date replicas.
read_repair_chance
Specifies the probability with which read repairs should be invoked on non-quorum reads. Must be between 0 and 1. Defaults to 0.1 (perform read repair 10% of the time). Lower values improve read throughput, but increase the chances of seeing stale values if you are not using a strong consistency level.
Calculating Tokens 
Tokens are used to assign a range of data to a particular node. Assuming you are using the RandomPartitioner (the default partitioner), the approaches described in this section will ensure even data distribution.
Each node in the cluster should be assigned a token before it is started for the first time. The token is set with the initial_token property in the cassandra.yaml configuration file.
Calculating Tokens for Multiple Racks
If you have multiple racks in single data center or a multiple data center cluster, you can use the same formula for calculating the tokens. However you should assign the tokens to nodes in alternating racks. For example: rack1, rack2, rack3, rack1, rack2, rack3, and so on. Be sure to have the same number of nodes in each rack.
auto_bootstrap
When set to true, populates a new node with a range of data when it joins an established cluster based on the setting of initial_token. If initial_token is not set, the newly added node will insert itself into the ring by splitting the token range of the most heavily loaded node. Leave set to false when initializing a brand new cluster.
partitioner
Sets the partitioning method used when assigning a row key to a particular node (also see initial_token).
Allowed values are:
• org.apache.cassandra.dht.RandomPartitioner (default)
• org.apache.cassandra.dht.ByteOrderedPartitioner
• org.apache.cassandra.dht.OrderPreservingPartitioner (deprecated)
• org.apache.cassandra.dht.CollatingOrderPreservingPartitioner (deprecated)
initial_token
The initial token assigns the node token position in the ring, and assigns a range of data to the node when it first starts up. The initial token can be left unset when introducing a new node to an established cluster using auto_bootstrap. Otherwise, the token value depends on the partitioner you are using. With the random partitioner, this value will be a number between 0 and 2**127. With the byte order preserving partitioner, this value will be a byte array of hex values based on your actual row key values. With the order preserving and collated order preserving partitioners, this value will be a UTF-8 string based on your actual row key values. See Calculating Tokens for more information.
Managing and Accessing Data in Cassandra
About Writes in Cassandra
About Compaction 
In the background, Cassandra periodically merges SSTables together into larger SSTables using a process called compaction. Compaction merges row fragments together, removes expired tombstones (deleted columns), and rebuilds primary and secondary indexes. Since the SSTables are sorted by row key, this merge is efficient (no random disk I/O).
Once a newly merged SSTable is complete, the input SSTables are marked as obsolete and eventually deleted by the JVM garbage collection (GC) process. However, during compaction, there is a temporary spike in disk space usage and disk I/O.
Compaction impacts read performance in two ways. While a compaction is in progress, it temporarily increases disk I/O and disk utilization which can impact read performance for reads that are not fulfilled by the cache. However, after a compaction has been completed, off-cache read performance improves since there are fewer SSTable files on disk that need to be checked in order to complete a read request.
As of Cassandra 1.0, there are two different compaction strategies that you can configure on a column family -size-tiered compaction or leveled compaction. See Tuning Compaction for a description of these compaction strategies.
About Transactions and Concurrency Control 
Unlike relational databases, Cassandra does not offer fully ACID-compliant transactions. There is no locking or transactional dependencies when concurrently updating multiple rows or column families. 
ACID is an acronym used to describe transactional behavior in a relational database systems, which stands for:
• Atomic. Everything in a transaction succeeds or the entire transaction is rolled back.
• Consistent. A transaction cannot leave the database in an inconsistent state.
 Isolated. Transactions cannot interfere with each other.
• Durable. Completed transactions persist in the event of crashes or server failure.
Cassandra trades transactional isolation and atomicity for high availability and fast write performance. In Cassandra, a write is atomic at the row-level, meaning inserting or updating columns for a given row key will be treated as one write operation. Cassandra does not support transactions in the sense of bundling multiple row updates into one all-or-nothing operation. Nor does it roll back when a write succeeds on one replica, but fails on other replicas. It is possible in Cassandra to have a write operation report a failure to the client, but still actually persist the write to a replica. 
For example, if using a write consistency level of QUORUM with a replication factor of 3, Cassandra will send the write to 2 replicas. If the write fails on one of the replicas but succeeds on the other, Cassandra will report a write failure to the client. However, the write is not automatically rolled back on the other replica. 
Cassandra uses timestamps to determine the most recent update to a column. The timestamp is provided by the client application. The latest timestamp always wins when requesting data, so if multiple client sessions update the same columns in a row concurrently, the most recent update is the one that will eventually persist. 
Writes in Cassandra are durable. All writes to a replica node are recorded both in memory and in a commit log before they are acknowledged as a success. If a crash or server failure occurs before the memory tables are flushed to disk, the commit log is replayed on restart to recover any lost writes.
About Inserts and Updates
Any number of columns may be inserted at the same time. When inserting or updating columns in a column family, the client application specifies the row key to identify which column records to update. The row key is similar to a primary key in that it must be unique for each row within a column family. However, unlike a primary key, inserting a duplicate row key will not result in a primary key constraint violation - it will be treated as an UPSERT (update the specified columns in that row if they exist or insert them if they do not).
Columns are only overwritten if the timestamp in the new version of the column is more recent than the existing column, so precise timestamps are necessary if updates (overwrites) are frequent. The timestamp is provided by the client, so the clocks of all client machines should be synchronized using NTP (network time protocol).
About Deletes
When deleting a row or a column in Cassandra, there are a few things to be aware of that may differ from what one would expect in a relational database.
1. Deleted data is not immediately removed from disk. Data that is inserted into Cassandra is persisted to SSTables on disk. Once an SSTable is written, it is immutable (the file is not updated by further DML operations). This means that a deleted column is not removed immediately. Instead a marker called a tombstone is written to indicate the new column status. Columns marked with a tombstone exist for a configured time period (defined by the gc_grace_seconds value set on the column family), and then are permanently deleted by the compaction process after that time has expired.
2. A deleted column can reappear if routine node repair is not run. Marking a deleted column with a tombstone ensures that a replica that was down at the time of delete will eventually receive the delete when it comes back up again. However, if a node is down longer than the configured time period for keeping tombstones (defined by the gc_grace_seconds value set on the column family), then the node can possibly miss the delete altogether, and replicate deleted data once it comes back up again. To prevent deleted data from reappearing, administrators must run regular node repair on every node in the cluster (by default, every 10 days). 
3. The row key for a deleted row may still appear in range query results. When you delete a row in Cassandra, it marks all columns for that row key with a tombstone. Until those tombstones are cleared by compaction, you have an empty row key (a row that contains no columns). These deleted keys can show up in results of get_range_slices() calls. If your client application performs range queries on rows, you may want to have if filter out row keys that return empty column lists.
Hinted Handoff Writes
Hinted handoff is an optional feature of Cassandra that reduces the time to restore a failed node to consistency once the failed node returns to the cluster. It can also be used for absolute write availability for applications that cannot tolerate a failed write, but can tolerate inconsistent reads.
When a write is made, Cassandra attempts to write to all replicas for the affected row key. If a replica is known to be down at the time the write occurs, a corresponding live replica will store a hint. The hint consists of location information (the replica node and row key that require a replay), as well as the actual data being written. There is minimal overhead to storing hints on replica nodes that already own the written row, since the data being written is already accounted for by the usual write process. The hint data itself is relatively small in comparison to most data rows.
If all replicas for the affected row key are down, it is still possible for a write to succeed if using a write consistency level of ANY. Under this scenario, the hint and written data are stored on the coordinator node, but will not be available to reads until the hint gets written to the actual replicas that own the row. The ANY consistency level provides absolute write availability at the cost of consistency, as there is no guarantee as to when written data will be available to reads (depending how long the replicas are down). Using the ANY consistency level can also potentially increase load on the cluster, as coordinator nodes must temporarily store extra rows whenever a replica is not available to accept a write.
About Reads in Cassandra
When a read request for a row comes in to a node, the row must be combined from all SSTables on that node that contain columns from the row in question, as well as from any unflushed memtables, to produce the requested data. To optimize this piecing-together process, Cassandra uses an in-memory structure called a bloom filter: each SSTable has a bloom filter associated with it that is used to check if any data for the requested row exists in the SSTable before doing any disk I/O. As a result, Cassandra is very performant on reads when compared to other storage systems, even for read-heavy workloads.
As with any database, reads are fastest when the most in-demand data (or hot working set) fits into memory. Although all modern storage systems rely on some form of caching to allow for fast access to hot data, not all of them degrade gracefully when the cache capacity is exceeded and disk I/O is required. Cassandra's read performance benefits from built-in caching, but it also does not dip dramatically when random disk seeks are required. When I/O activity starts to increase in Cassandra due to increased read load, it is easy to remedy by adding more nodes to the cluster.
For rows that are accessed frequently, Cassandra has a built-in key cache (and an optional row cache). See Tuning the Cache for more information about optimizing read performance using the built-in caching features.
Tuning Cassandra 
Effective tuning depends not only on the types of operations your cluster performs most frequently, but also on the shape of the data itself. For example, Cassandra's memtables have overhead for index structures on top of the actual data they store. If the size of the values stored in the columns is small compared to the number of columns and rows themselves (sometimes called skinny rows), this overhead can be substantial. Thus, the optimal tuning for this type of data is quite different than the optimal tuning for a small numbers of columns with more data (fat rows)
Tuning the Cache
Cassandra's built-in key and row caches can provide very efficient data caching. Some Cassandra production deployments have leveraged Cassandra's caching features to the point where dedicated caching tools such as memcached could be completely replaced. Such deployments not only remove a redundant layer from the stack, but they also achieve the fundamental efficiency of strengthening caching functionality in the lower tier where the data is already being stored. Among other advantages, this means that caching never needs to be restarted in a completely cold state.
Cache tuning should be done using small, incremental adjustments and then monitoring the effects of each change. See Monitoring and Adjusting Cache Performance for more information about monitoring tuning changes to a column family cache. With proper tuning, key cache hit rates of 85% or better are possible. Row caching, when feasible, can save the system from performing any disk seeks at all when fetching a cached row. Whenever growth in the read load begins to impact your cache hit rates, you can add capacity to quickly restore optimal levels of caching.
How Caching Works
If both row and key caches are configured, the row cache will return results whenever possible. In the case of a row cache miss, the key cache may still provide a hit, assuming that it holds a larger number of keys than the row cache.
If a read operation hits the row cache, the entire requested row is returned without a disk seek. If a row is not in the row cache, but is present in the key cache, the key cache is used to find the exact location of the row on disk in the SSTable. If a row is not in the key cache, the read operation will populate the key cache after accessing the row on disk so subsequent reads of the row can benefit. Each hit on a key cache can save one disk seek per SSTable.
Monitoring and Adjusting Cache Performance
Careful, incremental monitoring of cache changes is the best way to maximize benefit from Cassandra's built-in caching features. Adjustments that increase cache hit rate are likely to use more system resources, such as memory. After making changes to the cache configuration, it is best to monitor Cassandra as a whole for unintended impact on the system.
For each node and each column family, you can view cache hit rate, cache size, and number of hits by expanding org.apache.cassandra.db in the MBeans tab. For example: 
Monitor new cache settings not only for hit rate, but also to make sure that memtables and heap size still have sufficient memory for other operations. If you cannot maintain the desired key cache hit rate of 85% or better, add nodes to the system and re-test until you can meet your caching requirements.
Row cache is disabled by default. Caching large rows can very quickly consume memory. Row cache rates should be increased carefully in small increments. If row cache hit rates cannot be tuned to above 30%, it may make more sense to leave row caching disabled.
Configuring the Column Family Key Cache
The key cache holds the location of row keys in memory on a per-column family basis. High levels of key caching are recommended for most production scenarios. Turning this level up can optimize reads (after the cache warms) when there is a large number of rows that are accessed frequently.
The caching of 200,000 row keys is enabled by default. This can be adjusted by setting keys_cached on a column family. For example, using Cassandra CLI:
[default@demo] UPDATE COLUMN FAMILY users WITH keys_cached=205000;
Key cache performance can be monitored by using nodetool cfstats and examining the reported 'Key cache hit rate'.
keys_cached
Defines how many key locations will be kept in memory per SSTable (see rows_cached for details on caching actual row values). This can be a fixed number of keys or a fraction (for example 0.5 means 50 percent).
DataStax recommends a fixed sized cache over a relative sized cache. Only use relative cache sizing when you are confident that the data in the column family will not continue to grow over time. Otherwise, your cache will grow as your data set does, potentially causing unplanned memory pressure.
Configuring the Column Family Row Cache
The row cache holds the entire contents of the row in memory. In cases where rows are large or frequently modified/removed, row caching can actually be detrimental to performance. For this reason, row caching is disabled by default.
Row cache should remain disabled for column families with large rows or high write:read ratios. In these situations, row cache can very quickly consume a large amount of available memory. Note also that, when a row cache is operating efficiently, it keeps Java garbage compaction processes very active.
Row caching is best for workloads that access a small subset of the overall rows, and within those rows, all or most of the columns are returned. For this use case a row cache keeps the most accessed rows hot in memory, and can have substantial performance benefits.
To enable row cache on a column family, set rows_cached to the desired number of rows. To enable off-heap row caching, set row_cache_provider to SerializingCacheProvider. For example, using Cassandra CLI:
[default@demo] UPDATE COLUMN FAMILY users WITH rows_cached=100000
AND row_cache_provider='SerializingCacheProvider';
Data Consistency in Cassandra
Consistency refers to how up-to-date and synchronized a row of data is on all of its replicas. Cassandra extends the concept of eventual consistency by offering tunable consistency. For any given read or write operation, the client application decides how consistent the requested data should be.
Tunable Consistency for Client Requests 
Consistency levels in Cassandra can be set on any read or write query. This allows application developers to tune consistency on a per-query basis depending on their requirements for response time versus data accuracy. Cassandra offers a number of consistency levels for both reads and writes.
About Write Consistency
When you do a write in Cassandra, the consistency level specifies on how many replicas the write must succeed before returning an acknowledgement to the client application.
The following consistency levels are available, with ANY being the lowest consistency (but highest availability), and ALL being the highest consistency (but lowest availability). QUORUM is a good middle-ground ensuring strong consistency, yet still tolerating some level of failure.
A quorum is calculated as (rounded down to a whole number):
(replication_factor / 2) + 1
For example, with a replication factor of 3, a quorum is 2 (can tolerate 1 replica down). With a replication factor of 6, a quorum is 4 (can tolerate 2 replicas down)
Level
Description
ANY
A write must be written to at least one node. If all replica nodes for the given row key are down, the write can still succeed once a hinted handoff has been written. Note that if all replica nodes are down at write time, an ANY write will not be readable until the replica nodes for that row key have recovered.
ONE
A write must be written to the commit log and memory table of at least one replica node.
QUORUM
A write must be written to the commit log and memory table on a quorum of replica nodes.
LOCAL_QUORUM
A write must be written to the commit log and memory table on a quorum of replica nodes in the same data center as the coordinator node. Avoids latency of inter-data center communication.
EACH_QUORUM
A write must be written to the commit log and memory table on a quorum of replica nodes in all data centers.
ALL
A write must be written to the commit log and memory table on all replica nodes in the cluster for that row key.
Read Consistency 
When you do a read in Cassandra, the consistency level specifies how many replicas must respond before a result is returned to the client application. Cassandra checks the specified number of replicas for the most recent data to satisfy the read request (based on the timestamp).
The following consistency levels are available, with ONE being the lowest consistency (but highest availability), and ALL being the highest consistency (but lowest availability). QUORUM is a good middle-ground ensuring strong consistency, yet still tolerating some level of failure. 
A quorum is calculated as (rounded down to a whole number): 
(replication_factor / 2) + 1 
For example, with a replication factor of 3, a quorum is 2 (can tolerate 1 replica down). With a replication factor of 6, a quorum is 4 (can tolerate 2 replicas down).
Level
Description
ONE
Returns a response from the closest replica (as determined by the snitch). By default, a read repair runs in the background to make the other replicas consistent.
QUORUM
Returns the record with the most recent timestamp once a quorum of replicas has responded.
LOCAL_QUORUM
Returns the record with the most recent timestamp once a quorum of replicas in the current data center as the coordinator node has reported. Avoids latency of inter-data center communication.
EACH_QUORUM
Returns the record with the most recent timestamp once a quorum of replicas in each data center of the cluster has responded.
ALL
Returns the record with the most recent timestamp once all replicas have responded. The read operation will fail if a replica does not respond.
Note :
LOCAL_QUORUM and EACH_QUORUM are designed for use in multi-data center clusters using a rack-aware replica placement strategy (such as NetworkTopologyStrategy) and a properly configured snitch.


The CAP theorem
In 2000, Eric Brewer in his keynote speech at the ACM Symposium, said,  “A distributed system requiring always-on, highly-available operations cannot guarantee the illusion of coherent, consistent single system operation in the presence of network partitions, which cut communication between active servers”.

Consistency
A consistent system is defined as one that responds with the same output for the same request at the same time, across all the replicas. Loosely, one can say a consistent system is one where each server returns the right response to each request.
Relational databases are good for this task because they comply with the ACID properties. If both the customers make requests at the same time, one customer will have to wait till the other customer is done with the processing, and the database is made consistent. This may add a few milliseconds of wait to the customer who came later.
An eventual consistent database system (where consistency of data across the distributed servers may not be guaranteed immediately) may have shown availability of the book at the time of check out to both the customers. This will lead to a back order, and one of the customers will be paid back. This may or may not be a good policy. A large number of back orders may affect the shop’s reputation and there may also be financial repercussions.
Availability
Availability, in simple terms, is responsiveness; a system that’s always available to serve. The funny thing about availability is that sometimes a system becomes unavailable exactly when you need it the most.
Availability is the key component for extremely loaded services. Bad availability leads to bad user experience, dissatisfied customers, and financial losses.
Partition-tolerance
Network partitioning is defined as the inability to communicate between two or more subsystems in a distributed system. This can be due to someone walking carelessly in a data center and snapping the cable that connects the machine to the cluster, or may be network outage between two data centers, dropped packages, or wrong configuration. Partition-tolerance is a system that can operate during the network partition. In a distributed system, a network partition is a phenomenon where, due to network failure or any other reason, one part of the system cannot communicate with the other part(s) of the system. An example of network partition is a system that has some nodes in a subnet A and some in subnet B, and due to a faulty switch between these two subnets, the machines in subnet A will not be able to send and receive messages from the machines in subnet B. The network will be allowed to lose many messages arbitrarily sent from one node to another. This means that even if the cable between the two nodes is chopped, the system will still respond to the requests.

The significance of the CAP theorem
Once you decide to scale up, the first thing that comes to mind is vertical scaling, which means using beefier servers with a bigger RAM, more powerful processor(s), and bigger disks. For further scaling, you need to go horizontal. This means adding more servers. Once your system becomes distributed, the CAP theorem starts to play, which means, in a distributed system, you can choose only two out of consistency, availability, and partition tolerance. So, let’s see how choosing two out of the three options affects the system behavior as follows:
CA system: In this system, you drop partition-tolerance for consistency and availability. This happens when you put everything related to a transaction on one machine or a system that fails like an atomic unit, like a rack. This system will have serious problems in scaling.
CP system: The opposite of a CA system is a CP system. In a CP system, availability is sacrificed for consistency and partition-tolerance. What does this mean? If the system is available to serve the requests, data will be consistent. In an event of a node failure, some data will not be available. A sharded database is an example of such a system.
AP system : An available and partition-tolerant system is like an always-on system that is at risk of producing conflicting results in an event of network partition. This is good for user experience, your application stays available, and inconsistency in rare events may be alright for some use cases. In the book example, it may not be such a bad idea to back order a few unfortunate customers due to inconsistency of the system than having a lot of users return without making any purchases because of the system’s poor availability.
Eventual consistent (also known as BASE system): The AP system makes more sense when viewed from an uptime perspective—it’s simple and provides a good user experience.
Cassandra chooses Availability & Partition Tolerance over Consistency.


No comments:

Post a Comment

Architecture of Cassandra

Architecture of Cassandra A Cassandra instance is a collection of independent nodes that are configured together into a cluster. In a C...