我有一個非常大的Cassandra表,現在我有了spark-Cassandra與以下代碼的連接。
import pandas as pd
import numpy as np
from pyspark import *
import os
from pyspark.sql import SQLContext
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.1 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'
conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cassandra.connection.port", "9042").setAppName("Sentinel").setMaster("spark://Local:7077")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
table_df = sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table='movies', keyspace='movie_lens')\
.load()\
主鍵是Movie_id,它是一個整數。load()將整個表加載到內存中,這是我要避免的。我得到的一個方法是使用過濾器
table_df = sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table='movies', keyspace='movie_lens')\
.load()\
.filter("movie_id = 37032")
但過濾器是否真的阻止了將整個表加載到內存中?或者先加載然后過濾。另外,我還要查詢很多身份證。假設我需要1000個身份證,每天都在換。那怎么辦呢?
是的,如果您在分區鍵上進行查詢,Spark Cassandra連接器將執行so-called“謂詞下推”,并且將僅從特定查詢加載數據(
.load
函數將只加載元數據,實際的數據加載將在您確實需要數據來執行操作時第一次發生)。關于Spark-Cassandra連接器中何時發生謂詞下推,有很好的文檔規則。您還可以通過運行table_df.explain()
來檢查這一點,并為標有星號*
的過濾器查找PushedFilters
部分。如果您需要查找多個id,那么您可以使用
.isin
過濾器,但不建議使用Cassandra。最好用IDs創建一個dataframe,并用cassandradataframe執行so-called直接連接(自從scc2.5用于dataframes,或者更早用于RDDs以來,它就可用了)。我在卡桑德拉有一篇關于加入數據的博文