PySpark read and write to Phoenix table
A simple question, is it possible to load a pyspark data frame to a phoenix table. Well answer is YES.
Let's see how can we do that.
To push data from pyspark data frame to Phoenix table we need to have an existing phoenix table created through the Phoenix but not HBASE shell. Since, often times tables created from hbase shell doesn't appear in phoenix until you create a new table or view in phoenix and point that to existing hbase table. Instead to avoid those kind of discrepancies I would suggest to load data into hbase using phoenix if you willing to use it for querying.
| C | col1 | col2 | col3 |
- If table not exists, use the below command to create the table in Phoenix.
CREATE TABLE IF NOT EXISTS <TABLE_NAME> ( ROWKEY <DATA_TYPE> NOT NULL PRIMARY KEY, <COL_FAMILY>.<COL_NAME> <DATA_TYPE>, <COL_FAMILY>.<COL_NAME> <DATA_TYPE>, - - - - - - - - - - - - - - -up to n number of columns);
NOTE: If you're creating a phoenix table to point an existing hbase table make sure that the data types mentioned in create query matches the data types in existing hbase table. For the data types in phoenix refer to phoenix documentation link. For example, you need to use VARCHAR instead of string, BIGINT instead of long etc.
For example,
CREATE TABLE IF NOT EXISTS SAMPLE ( ID VARCHAR NOT NULL PRIMARY KEY, CF1.COL1 INTEGER, CF1.COL2 DOUBLE, CF2.COL3 FLOAT, CF2.COL4 DECIMAL(12,2)); - Now writing the data frame to existing Phoenix table:
df.write.format("org.apache.phoenix.spark”).mode(“overwrite”).option(“table”,”<TABLE_NAME>”).option(“zkUrl”,”<clusterName>:<zkNodePortNumber>”).save()
NOTE: The column names in the Pyspark data frame should match with the column names in the existing Phoenix table. The cluster name in above command should be the "fqdn" for the hbase node and default port number for zookeeper is 2181(make sure in your environment and provide accordingly). - Now make sure in your hbase shell whether the data is inserted into table, and you'll find something interesting. You see the row key specified in the phoenix query also appears as a column with a random value(In my table value is 'x'). This is what happens when you load data through phoenix.I don't have much information regarding this but post comments if you find something.
In Phoenix it appears as:
| C | col1 | col2 | col3 |
+------+-------+-------+--------+
| 1 | 2 | 3 | 4 |
| 15 | 16 | 17 | 18 |
| 5 | 6 | 7 | 8 |
+-----+-------+-------+--------+
In HBASE it appears as:
Don't worry if the data looks gibbersh for you, that's how it displays in hbase shell because everything you insert into hbase is inserted as a byte array irrespective of it's data type we mentioned but except timestamp displays very nicely in numbers since it's internally managed by hbase itself. This is because hbase try to implement unified approach for handling all kinds of data in a similar way.
| 1 | 2 | 3 | 4 |
| 15 | 16 | 17 | 18 |
| 5 | 6 | 7 | 8 |
+-----+-------+-------+--------+
In HBASE it appears as:
1 column=cf:\x00\x00\x00\x00, timestamp=1546530120297, value=x
1 column=cf:\x80\x0B, timestamp=1546530120297, value=2
1 column=cf:\x80\x0C, timestamp=1546530120297, value=3
1 column=cf:\x80\x0D, timestamp=1546530120297, value=4
15 column=cf:\x00\x00\x00\x00, timestamp=1546530148087, value=x
15 column=cf:\x80\x0B, timestamp=1546530148087, value=16
15 column=cf:\x80\x0C, timestamp=1546530148087, value=17
15 column=cf:\x80\x0D, timestamp=1546530148087, value=18
5 column=cf:\x00\x00\x00\x00, timestamp=1546530139067, value=x
5 column=cf:\x80\x0B, timestamp=1546530139067, value=6
5 column=cf:\x80\x0C, timestamp=1546530139067, value=7
5 column=cf:\x80\x0D, timestamp=1546530139067, value=8
Don't worry if the data looks gibbersh for you, that's how it displays in hbase shell because everything you insert into hbase is inserted as a byte array irrespective of it's data type we mentioned but except timestamp displays very nicely in numbers since it's internally managed by hbase itself. This is because hbase try to implement unified approach for handling all kinds of data in a similar way.
- Now let's load a phoenix table into a pyspark data frame:
df = spark.read.format("org.apache.phoenix.spark”).option(“table”,”<TABLE_NAME>”).option(“zkUrl”,”<clusterName>:<zkNodePortNumber>”).load()
Hope this is useful. Please share if so.😊
Comments
Post a Comment