Spark Crunch Indexer – Cloudera Search (Apache Solr)

Posted: September 25, 2022 in Hadoop
Tags:

An example for explaining batch (Spark Crunch Indexer) for Cloudera Search.

Cloudera Search offers the following methods for indexing data at scale:

  • batch indexing (Spark or MapReduce indexing: MapReduceIndexerTool or Lily HBase batch indexing)
  • NRT indexing (Lily HBase NRT indexing or Flume NRT indexing)

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDP 7.1.7 SP1)
  • Scala: 2.11.12
  • Spark: 2.4.7.7.1.7.1044-1
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292

HDFS commands

hdfs dfs -mkdir /user/solrsearch/cities
hdfs dfs -put -f /home/solrsearch/cities/geonames-all-cities-with-a-population-1000.csv /user/solrsearch/cities

Hive queries

DROP TABLE IF EXISTS cities;

CREATE EXTERNAL TABLE cities
(
geoname_id		    string,
name			    string,
ascii_name		    string,
alternate_names		string,
feature_class		string,
feature_code		string,
country_code		string,
cou_name_en		    string,
country_code_2		string,
admin1_code		    string,
admin2_code		    string,
admin3_code		    string,
admin4_code		    string,
population		    integer,
elevation		    string,
dem			        integer,
timezone		    string,
modification_date	date,
label_en		    string,
coordinates		    string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES("separatorChar"=";","quoteChar"="\"")
LOCATION '/user/solrsearch/cities'
TBLPROPERTIES
(
"skip.header.line.count"="1",
'external.table.purge'='true'
);

SELECT * FROM solrsearch.cities LIMIT 100;

Solr commands

# Cloudera  Zookeeper services:
democdhm01.placko.eu
democdhm02.placko.eu
democdhd03.placko.eu
# Cloudera SolR service:
democdhd01.placko.eu
democdhd02.placko.eu
democdhd03.placko.eu

su solrsearch
kinit -kt /etc/security/keytabs/solrsearch.keytab solrsearch
# optional: klist
 
/home/solrsearch
 
 
solrctl instancedir --generate $HOME/cities
 
# modify the managed-schema and add morphline file (change the zkHost if needed)
solrctl config --upload cities $HOME/cities -overwrite
 
# optional: solrctl instancedir --list
# optional: solrctl instancedir --delete cities
 
# create collection based on the configuration uploaded before
solrctl collection --create cities -s 1 -r 3 -c cities
# optional: solrctl collection --list
# optional: solrctl collection --delete cities
 

curl --negotiate -u: https://democdhd01.placko.eu:8985/solr/admin?op=GETDELEGATIONTOKEN > tokenFile.txt
curl --negotiate -u: https://democdhd01.placko.e:8985/solr/admin?op=GETDELEGATIONTOKEN > tokenFile.txt
curl --negotiate -u: https://democdhd01.placko.e:8985/solr/admin?op=GETDELEGATIONTOKEN > tokenFile.txt
 
export myDriverJarDir=/opt/cloudera/parcels/CDH/lib/solr/contrib/crunch
export myDriverJar=$(find $myDriverJarDir -maxdepth 1 -name 'search-crunch-*.jar' ! -name '*-job.jar' ! -name '*-sources.jar')
export myDependencyJarDir=/opt/cloudera/parcels/CDH/lib/search/lib/search-crunch
export myJVMOptions="-DmaxConnectionsPerHost=10000 -DmaxConnections=10000 -Djava.io.tmpdir=/tmp/"
export myDependencyJarFiles=$(find $myDependencyJarDir -name '*.jar' | sort | tr '\n' ',' | head -c -1)

spark-submit \
--name "SparkToSolrIngest-cities" \
--master yarn \
--deploy-mode cluster \
--jars $myDependencyJarFiles \
--executor-memory 16G \
--driver-memory 16G \
--conf "spark.executor.extraJavaOptions=$myJVMOptions" \
--driver-java-options "$myJVMOptions" \
--class org.apache.solr.crunch.CrunchIndexerTool \
--files $HOME/tokenFile.txt,$HOME/cities/conf/morphline.conf \
$myDriverJar \
-D hadoop.tmp.dir=/tmp \
-D tokenFile=tokenFile.txt \
-D morphlineVariable.ZK_HOST=$(hostname):2181/solr \
--morphline-file morphline.conf \
--pipeline-type spark \
--chatty \
hdfs://nameservice1/user/solrsearch/cities

managed-schema

<?xml version="1.0" encoding="UTF-8" ?>

<schema name="cities" version="1.6">
	<field name="geoname_id" type="string" indexed="true" stored="true" required="true" multiValued="false"/>
	<field name="name" type="text_autocomplete" indexed="true" stored="true" omitNorms="true" omitTermFreqAndPositions="true"/>
	<field name="ascii_name" type="string" indexed="true" stored="true"/>
	<field name="alternate_names" type="string" indexed="true" stored="true"/>
	<field name="feature_class" type="string" indexed="true" stored="true"/>
	<field name="feature_code" type="string" indexed="true" stored="true"/>
	<field name="country_code" type="string" indexed="true" stored="true"/>
	<field name="cou_name_en" type="string" indexed="true" stored="true"/>
	<field name="country_code_2" type="string" indexed="true" stored="true"/>
	<field name="admin1_code" type="string" indexed="true" stored="true"/>
	<field name="admin2_code" type="string" indexed="true" stored="true"/>
	<field name="admin3_code" type="string" indexed="true" stored="true"/>
	<field name="admin4_code" type="string" indexed="true" stored="true"/>
	<field name="population" type="string" indexed="true" stored="true"/>
	<field name="elevation" type="string" indexed="true" stored="true"/>
	<field name="dem" type="string" indexed="true" stored="true"/>
	<field name="timezone" type="string" indexed="true" stored="true"/>
	<field name="modification_date" type="string" indexed="true" stored="true"/>
	<field name="label_en" type="string" indexed="true" stored="true"/>
	<field name="coordinates" type="string" indexed="true" stored="true"/>
        <field name="_version_" type="long" indexed="true" stored="true"/>
	<uniqueKey>geoname_id</uniqueKey>
	<fieldType name="text_autocomplete" class="solr.TextField" positionIncrementGap="100">
		<analyzer type="multiterm">
			<tokenizer class="solr.KeywordTokenizerFactory"/>
			<filter class="solr.GermanNormalizationFilterFactory"/>	
			<filter class="solr.LowerCaseFilterFactory"/>
			<filter class="solr.EdgeNGramFilterFactory" minGramSize="3" maxGramSize="10" />
		</analyzer>
		<analyzer type="query">
			<tokenizer class="solr.KeywordTokenizerFactory"/>
			<filter class="solr.GermanNormalizationFilterFactory"/>		
			<filter class="solr.LowerCaseFilterFactory"/>
		</analyzer>
	</fieldType>
	<fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
	<fieldType name="string" class="solr.StrField" sortMissingLast="true" docValues="true" />
	<fieldType name="strings" class="solr.StrField" sortMissingLast="true" multiValued="true" docValues="true" />
	<fieldType name="boolean" class="solr.BoolField" sortMissingLast="true"/>
	<fieldType name="booleans" class="solr.BoolField" sortMissingLast="true" multiValued="true"/>
	<fieldType name="pint" class="solr.IntPointField" docValues="true"/>
	<fieldType name="pfloat" class="solr.FloatPointField" docValues="true"/>
	<fieldType name="plong" class="solr.LongPointField" docValues="true"/>
	<fieldType name="pdouble" class="solr.DoublePointField" docValues="true"/>
	<fieldType name="pints" class="solr.IntPointField" docValues="true" multiValued="true"/>
	<fieldType name="pfloats" class="solr.FloatPointField" docValues="true" multiValued="true"/>
	<fieldType name="plongs" class="solr.LongPointField" docValues="true" multiValued="true"/>
	<fieldType name="pdoubles" class="solr.DoublePointField" docValues="true" multiValued="true"/>
	<fieldType name="pdate" class="solr.DatePointField" docValues="true"/>
	<fieldType name="pdates" class="solr.DatePointField" docValues="true" multiValued="true"/>
	<fieldType name="binary" class="solr.BinaryField"/>
	<fieldType name="text_general" class="solr.TextField" positionIncrementGap="100" multiValued="true">
		<analyzer type="index">
			<tokenizer class="solr.StandardTokenizerFactory"/>
			<filter class="solr.StopFilterFactory" ignoreCase="true" words="stopwords.txt" />
			<filter class="solr.LowerCaseFilterFactory"/>
		</analyzer>
		<analyzer type="query">
			<tokenizer class="solr.StandardTokenizerFactory"/>
			<filter class="solr.StopFilterFactory" ignoreCase="true" words="stopwords.txt" />
			<filter class="solr.SynonymGraphFilterFactory" synonyms="synonyms.txt" ignoreCase="true" expand="true"/>
			<filter class="solr.LowerCaseFilterFactory"/>
		</analyzer>
	</fieldType>
</schema>

morphline.conf

SOLR_LOCATOR : {
  collection : cities
  zkHost : "democdhm02.placko.eu:2181/solr"
}
morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
    commands : [
      {
        readCSV {
          separator : ";"
		  columns : [geoname_id, name, ascii_name, alternate_names, feature_class, feature_code, country_code, cou_name_en, country_code_2, admin1_code, admin2_code, admin3_code, admin4_code, population, elevation, dem, timezone, modification_date, label_en, coordinates]
          ignoreFirstLine : true
          quoteChar : "\""
          commentPrefix : ""
          trim : true
          charset : UTF-8
        }
      }
      {
        sanitizeUnknownSolrFields {
          # Location from which to fetch Solr schema
          solrLocator : ${SOLR_LOCATOR}
        }
      }
      # log the record at DEBUG level to SLF4J
      { logDebug { format : "output record: {}", args : ["@{}"] } }
      # load the record into a Solr server or MapReduce Reducer
      {
        loadSolr {
          solrLocator : ${SOLR_LOCATOR}
        }
      }
    ]
  }
]

Securities – Ranger

HDFS
policy: hdfs_solrsearch_rwe
path: /user/solrsearch/cities
group: solrsearch / user: solrsearch
persmissions: read, write, execute

HIVE
policy: hive_solrsearch_all
database: solrsearch (tb: all, cl: all)
group: solrsearch / user: solrsearch
permissions: all

policy: hive_url_solrsearch_r
url: hdfs://nameservice1//user/solrsearch/cities
group: solrsearch / user: solrsearch
permissions: read

SOLR
policy: solr_solrsearch
Solr Collection: cities
group: solrsearch / user: solrsearch
permissions: query, update, others, solr admin

Securities – Kerberos

- SPNEGO -> auth. via Kerberos (NO via KNOX) - for client apps with big numbers of requests e.g. SolR use cases
- SolR admin via KNOX (user and password) -> https://democdhm01.placko.eu:8443/gateway/homepage/home/

Kerberos example (source: https://web.mit.edu/kerberos/dist/#kfw-3.2)
Kerberos on Windows Client
Firefox
about:config
network.negotiate-auth.trusted-uris  .placko.eu
network.auth.use-sspi false
network.negotiate-auth.allow-proxies true
network.negotiate-auth.delegation-uris  .placko.eu
 
cd C:\Program Files\MIT\Kerberos\bin
set KRB5_CONFIG=C:\Users\<user>\Documents\krb5.ini
set KRB5_TRACE=C:\Users\<user>\Documents\kinit.log
kinit solrsearch@<FQDN>

Testing

curl --negotiate -u: https://democdhd01.placko.eu:8985/solr/admin?op=GETDELEGATIONTOKEN > tokenFile.txt

curl --negotiate -u : "https://democdhd01.placko.eu:8985/solr/cities/select?q=ascii_name:Brati*"

{
  "responseHeader":{
    "zkConnected":true,
    "status":0,
    "QTime":1,
    "params":{
      "q":"ascii_name:Brati*"}},
  "response":{"numFound":3,"start":0,"docs":[
      {
        "admin1_code":"02",
        "timezone":"Europe/Bratislava",
        "dem":"133",
        "geoname_id":"3060280",
        "ascii_name":"Bratislava - Vajnory",
        "admin2_code":"103",
        "modification_date":"2019-06-09",
        "feature_code":"PPLX",
        "cou_name_en":"Slovakia",
        "coordinates":"48.20563,17.20759",
        "label_en":"Slovakia",
        "population":"5484",
        "alternate_names":"Pozsonyszolos,Pozsonyszőlős,Pracsa,Prácsa,Vajnory,Weinern",
        "country_code":"SK",
        "feature_class":"P",
        "name":"Bratislava - Vajnory",
        "admin3_code":"529362",
        "_version_":1744658835843317767},
      {
        "admin1_code":"04",
        "timezone":"Europe/Bucharest",
        "country_code_2":"RO",
        "dem":"269",
        "geoname_id":"683802",
        "ascii_name":"Bratila",
        "admin2_code":"22898",
        "modification_date":"2013-04-21",
        "feature_code":"PPL",
        "cou_name_en":"Romania",
        "coordinates":"46.32348,26.77442",
        "label_en":"Romania",
        "population":"2092",
        "alternate_names":"Bratila,Bratila de Mijloc,Brătila,Brătila de Mijloc",
        "country_code":"RO",
        "feature_class":"P",
        "name":"Brătila",
        "_version_":1744658841818103812},
      {
        "admin1_code":"02",
        "timezone":"Europe/Bratislava",
        "dem":"157",
        "geoname_id":"3060972",
        "ascii_name":"Bratislava",
        "modification_date":"2019-09-05",
        "feature_code":"PPLC",
        "cou_name_en":"Slovakia",
        "coordinates":"48.14816,17.10674",
        "label_en":"Slovakia",
        "population":"423737",
        "alternate_names":"An Bhrataslaiv,An Bhratasláiv,BTS,Baratislawa,Bracislava,Bratislav,Bratislava,Bratislava osh,Bratislavae,Bratislavo,Bratislawa,Bratisllava,Bratisława,Bratyslawa,Bratysława,Bratîslava,Mpratislaba,Posonium,Pozsony,Presburg,Presporok,Prespurk,Pressburg,Preszburg,Preßburg,Prešporok,Prešpurk,beulatiseullaba,bra ti sla wa,bratisalava,bratislabha,bratislava,bratislavha,bratslawa,bratsylava,bratyslafa,bratyslava,bratyslaw,bratyslawa,bu la di si la fa,burachisuravua,pirattislava,Μπρατισλάβα,Братислав,Братиславæ,Братислава,Братислава ош,Братіслава,Братїслава,Браціслава,Բրատիսլավա,בראטיסלאווא,ברטיסלאבה,براتسلاوا,براتسیلاڤا,براتىسلاۋا,براتيسلافا,براتیسلاو,براتیسلاوا,براٹیسلاوا,ܒܪܛܝܣܠܐܒܐ,ब्रातिस्लाभा,ब्रातिस्लाव्हा,ব্রাতিস্লাভা,ਬ੍ਰਾਤਿਸਲਾਵਾ,பிராத்திஸ்லாவா,ಬ್ರಾಟಿಸ್ಲಾವಾ,ബ്രാട്ടിസ്‌ലാവ,บราติสลาวา,བ་ར་ཏིསི་ལ་བ།,ბრატისლავა,ብራቲስላቫ,ブラチスラヴァ,布拉迪斯拉发,布拉迪斯拉發,브라티슬라바",
        "country_code":"SK",
        "feature_class":"P",
        "name":"Bratislava",
        "_version_":1744658845288890378}]
  }}

Additional Info

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s