当前位置:首页 > 公司管理咨询 > 正文

Spark读取elasticsearch数据指南

最近要在Sparkjob中通过SparkSQL的方式读取Elasticsearch数据,踩了一些坑,总结于此。环境说明Sparkjob的编写语言为Scala,scala-library的版本为2.11.8。Spark相关依赖包的版本为2.3.2,如spark-core、spark-sql。Elast...

最近要在Sparkjob中通过SparkSQL的方式读取Elasticsearch数据,踩了一些坑,总结于此。环境说明Sparkjob的编写语言为Scala,scala-library的版本为2.11......

最近要在Sparkjob中通过SparkSQL的方式读取Elasticsearch数据,踩了一些坑,总结于此。

环境说明

Sparkjob的编写语言为Scala,scala-library的版本为2.11.8。

Spark相关依赖包的版本为2.3.2,如spark-core、spark-sql。

Elasticsearch数据

schema

{"settings":{"number_of_replicas":1},"mappings":{"label":{"properties":{"docId":{"type":"keyword"},"labels":{"type":"nested","properties":{"id":{"type":"long"},"label":{"type":"keyword"}}},"itemId":{"type":"long"}}}}}

sampledata

{"took":141,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":17370929,"max_score":1.0,"hits":[{"_index":"aen-label-v1","_type":"label","_id":"123_ITEM","_score":1.0,"_source":{"docId":"123_ITEM","labels":[{"id":7378,"label":"1kg"}],"itemId":123}},{"_index":"aen-label-v1","_type":"label","_id":"456_ITEM","_score":1.0,"_source":{"docId":"456_ITEM","labels":[{"id":7378,"label":"2kg"}],"itemId":456}}]}}
准备工作

既然要用SparkSQL,当然少不了其对应的依赖,

depencies{implementation':spark-core_2.11:2.3.2'implementation':spark-sql_2.11:2.3.2'}

对于ES的相关库,如同官网所说,要在Spark中访问ES,需要将elasticsearch-hadoop依赖包加入到Sparkjob运行的类路径中,具体而言就是添加到Sparkjob工程的依赖中,公司的nexus中当前最新的版本为7.15.0,且目前我们是使用gradle管理依赖,故添加依赖的代码如下,

depencies{implementation':elasticsearch-hadoop:7.15.0'}
本地测试

对于Spark,基于资源管理器的不同,可以在两种模式下运行:本地模式和集群模式,可通过--master参数来指定资源管理器的方式。本地模式时,不依赖额外的Spark集群,Spark将在同一台机器上运行所有内容,非常方便用于本地测试,对于SparkSQL,只需要在创建SparkSession时采用local的模式即可,

classMyUtilsextsSerializable{defesHost()=s""//localmodedefgetLocalSparkSession:SparkSession=().master("local").getOrCreate()//clustermodedefgetSparkSession:SparkSession=().enableHiveSupport().config("","3600").getOrCreate()}
测试代码
objectLocalTestextsLazyLogging{defmain(args:Array[String]):Unit={newLocalTest().run()}}classLocalTest{defrun():Unit={valmyUtils=newMyUtilsvalspark=_varstart=()valattributeId=7378LvallabelNames=Array("aen-label-retail","aen-label-seller")("es").option("",()).option("","9200").option("",value=true).option("",(",").join((labelNames:_*))+"/label").option("",2000).load().createOrReplaceTempView("temp_labels")valsqlDf=("selectitemId,labelsfromtemp_labelswhereitemIdin(123,456)")valnewDf=(row={vallabels=[Seq[Row]]("labels")vallabelValue=(p=[Long]("id")==attributeId).map(p=[String]("label"))([Long]("itemId"),attributeId,)}).withColumn("final_result",lit("PASS")).toDF("itemId","attributeId","label","final_result")valfinalDf=("itemId","attributeId","label","result")()()varemptyDf=(col("label").isNotNull).toDF("itemId","attributeId","label","result")emptyDf=(finalDf)()()(col("itemId")===6238081929Landcol("label").notEqual(col("result"))).show()valattributeTypeIds=(3)(100)valattributeTypeIdsStr=(",").join((attributeTypeIds:_*))println(attributeTypeIdsStr)_emptyDf=(!col("itemId").isin((Long2long).toList:_*))(false)}}
知识点

SparkSQLDataSources

SparkSQL通过DataFrameReader类支持读取各种类型的数据源,比如Parquet、ORC、JSON、CSV等格式的文件,Hivetable,以及其他database。而Elasticsearch只不过是众多数据源中的一种,DataFrameReader通过format()指定数据源格式,通过option()定制对应数据源下的配置,最后通过load()加载生成DataFrame,也就是Dataset[Row]的类型别名。有了DataFrame,就可以创建一个临时表,然后就能以SQL的方式读取数据。

在以前,Elasticsearch在format()中对应的source名需要是全包名,而在以及之后的版本,source名称简化为es。

SparkSQL中DataFrame常用API

(),打印schema

(),查看数据列表,默认是truncate前20条,传false时列出全部数据。

("view_name"),构建临时表视图,方便后续SQL操作。

(),添加新列或替换现有列。("final_result",lit("PASS")),通过lit添加常量列。

(col("label").isNotNull),用指定的条件过滤行。

("itemId","attributeId"),按指定列对行去重,返回新的数据集。

(otherDf),将两个DataFrame的记录合并且不去重,相当于unionall。

("itemId","attributeId","label","final_result"),为df各列指定一个有意义的名称。

Scala与Java类型映射

Array[T]-T[]

Scala与Java类型转换

_newDf=(!col("itemId").isin((Long2long).toList:_*))

Scala中的:_*

:_*是typeascription的一个特例,它会告诉编译器将序列类型的单个参数视为变参数序列,即varargs。应用例子,

valindices=Array("aen-label","aen-label-seller")(",").join((indices:_*))
踩的坑

该配置项表示连接器是否用于WAN上的云或受限环境如AWS中的Elasticsearch实例,默认为false,而公司的Elasticsearch集群是在AWS上的,point只能在内网访问,因而刚开始测试时,遇到如下报错,

Exceptioninthread"main":(:159)(:223)$lzycompute(:73)(:72)(:44)$anonfun$partitions$2.apply(:253)$anonfun$partitions$2.apply(:251)(:121)(:251)(:46)$anonfun$partitions$2.apply(:253)$anonfun$partitions$2.apply(:251)(:121)(:251)(:46)$anonfun$partitions$2.apply(:253)$anonfun$partitions$2.apply(:251)(:121)(:251)(:46)$anonfun$partitions$2.apply(:253)$anonfun$partitions$2.apply(:251)(:121)(:251)(:46)$anonfun$partitions$2.apply(:253)$anonfun$partitions$2.apply(:251)(:121)(:251)(:340)(:38)$apache$spark$sql$Dataset$collectFromPlan(:3278)$anonfun$head$1.apply(:2489)$anonfun$head$1.apply(:2489)$anonfun$52.apply(:3259)$.withNewExecutionId(:77)(:3258)(:2489)(:2703)(:254)(:723)

通过option("",value=true)将配置项设置为true后恢复正常。

_

在遍历DataFrame时遇到如下编译错误,

(Int,String,etc)andProducttypes(caseclasses)_

在处理DataFrame之前需要加上_,用于将常见的Scala对象转换为DataFrame,通常在获取SparkSession后立马import。

SparkSQL读取hive表中array类型时,对于Scala语言,得到的类型是WrappedArray而不是Array

当我们通过createOrReplaceTempView("temp_labels")构建一个临时表视图后,就可以通过SQL像操作hive表那样读取数据。例如读取指定的列,

valsqlDf=("selectitemId,labelsfromtemp_labelswhereitemIdin(123,456)")

通过()可以看到sqlDf的schema长这样,

root|--itemId:long(nullable=true)|--labels:array(nullable=true)||--element:struct(containsNull=true)|||--id:long(nullable=true)|||--label:string(nullable=true)

labels是包含struct的数组,于是从row中将labels列读出时想尝试转换为Array,

valnewDf=(row={vallabels=[Array[Row]]("labels")vallabelValue=(p=[Long]("id")==attributeId).map(p=[String]("label"))([Long]("itemId"),attributeId,)})

结果报错如下,

:$ofRefcannotbecastto[;

可以看到SparkSQL在读取表中数组列时,是用的来存储结果的,看其类定义可知,它是间接实现Seq接口的,所以也可用[Seq[Row]]("labels")来读取。这里需要注意的是,Array[T]虽然在Scala源码定义中是class,但其对标的Java类型是原生数组T[]。

判断Column是否为null时,需要用isnull或isnotnull,而不是===或!==

对于错误的用法,filter并不会生效,就像下面这样

(col("label")!==null)

这一点和hive表以及MySQL表判断字段是否为null,是保持一致的,应该像下面这样,

(col("label").isNotNull)
最终代码
{DataFrame,Row,SaveMode,SparkSession}objectTestMainextsLazyLogging{defmain(args:Array[String]):Unit={valmyUtils=newMyUtilsnewTestApp(myUtils).run()}}classTestApp(myUtils:MyUtils)extsSerializablewithLazyLogging{defesDf(spark:SparkSession,indices:Array[String]):DataFrame={("es").option("",()).option("","9200").option("",value=true).option("",(",").join((indices:_*))+"/label").option("",2000).load()}defrun():Unit={valspark=_valesTempView="es_label"vallabelNames=Array("aen-label-retail","aen-label-seller")esDf(spark,labelNames).createOrReplaceTempView(esTempView)vallabelDf=getLabelDf(spark,itemIdsStr,attributeTypeIds,esTempView)println("debuglog")()()("final_labels")valdata=(s"""|selectcc.*,_result,,nullasremark|fromtemp_requestcc|leftjoinfinal_labelspp|=|=|='$jobId'|""".stripMargin)().().option("compression","gzip").json(s"s3://sherlockyb-test/check-precision/job_id=$jobId")}defgetLabelDf(spark:SparkSession,itemIdsStr:String,attributeTypeIds:Array[String],esTempView:String):DataFrame={_valsqlDf=(s"selectitemId,labelsfrom$esTempViewwhereitemIdin($itemIdsStr)")valemptyDf==(attributeTypeId={valattributeDf=(row={vallabels=[Seq[Row]]("labels")vallabelValue=(p=[Long]("id")==).map(p=[String]("label"))([Long]("itemId"),,)}).withColumn("final_result",lit("PASS")).toDF("itemId","attributeId","label","final_result").filter(col("label").isNotNull)if(labelDf==emptyDf){labelDf=attributeDf}else{labelDf=(attributeDf)}})("itemId","attributeId")}}
补充:提交sparkjob

将job工程打包为Jar,上传到AWS的s3,比如s3://sherlockyb-test/1.0.0/artifacts/spark/目录下,然后通过Genie提交sparkjob到Spark集群运行。Genie是Netflix研发的联合作业执行引擎,提供REST-fullAPI来运行各种大数据作业,如Hadoop、Pig、Hive、Spark、Presto、Sqoop等。

defrun_spark(job_name,spark_jar_name,spark_class_name,arg_str,spark_param=''):_GENIE_URL=""job=()\.genie_username('sherlockyb')\.job_name(job_name)\.job_version('0.0.1')\.metadata(teamId='team_account')\.metadata(teamCredential='team_password')_tags(['type:yarn-kerberos','sched:default'])_tags(['type:spark-submit-kerberos','ver:2.3.2'])_arguments(f"--class{spark_class_name}{spark_param}"f"s3a://sherlockyb-test/1.0.0/artifacts/spark/{spark_jar_name}"f"{arg_str}")#SubmitthejobtoGenierunning_job=()running_()returnrunning_

最新文章