Skip to content

Instantly share code, notes, and snippets.

@randypitcherii
Created July 7, 2020 18:44
Show Gist options
  • Select an option

  • Save randypitcherii/06680c58690842268e2c82b152cd19aa to your computer and use it in GitHub Desktop.

Select an option

Save randypitcherii/06680c58690842268e2c82b152cd19aa to your computer and use it in GitHub Desktop.
//=============================================================================
// Set context
//=============================================================================
USE ROLE <YOUR_ROLE>;
USE WAREHOUSE <YOUR_WAREHOUSE>;
USE DATABASE <YOUR_DATABASE>;
CREATE SCHEMA NETFLIX_STREAMS_AND_TASKS;
//=============================================================================
//=============================================================================
// Create tables and add show data
//=============================================================================
CREATE OR REPLACE TABLE
NETFLIX_RATING_EVENTS(
RAW_DATA VARIANT
);
CREATE OR REPLACE TABLE
NETFLIX_RATINGS(
SHOW_ID NUMBER,
SHOW_TITLE STRING,
RATING NUMBER
);
CREATE OR REPLACE TABLE
NETFLIX_SHOWS(
ID NUMBER,
TITLE STRING
);
INSERT INTO
NETFLIX_SHOWS // Shows added in no particular order
VALUES
(0, 'BoJack Horseman'), // <--- This is the best show on Netflix
(1, 'Ozark'),
(2, 'Master of None'),
(3, 'Mindhunter'),
(4, 'The Haunting of Hill House'),
(5, 'Tiger King'),
(6, 'Stranger Things');
//=============================================================================
//=============================================================================
// Create 2 streams on the NETFLIX_RATING_EVENTS table
//=============================================================================
CREATE OR REPLACE STREAM STREAM_A ON TABLE NETFLIX_RATING_EVENTS;
CREATE OR REPLACE STREAM STREAM_B ON TABLE NETFLIX_RATING_EVENTS;
SHOW STREAMS;
//=============================================================================
//=============================================================================
// Modify NETFLIX_RATING_EVENTS and examine streams
//=============================================================================
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(' { "show_id": 0, "rating": 10 } ');
SELECT * FROM STREAM_A;
SELECT * FROM STREAM_B;
INSERT INTO NETFLIX_RATINGS(SHOW_ID, SHOW_TITLE, RATING) (
SELECT
NETFLIX_SHOWS.ID AS SHOW_ID,
NETFLIX_SHOWS.TITLE AS SHOW_TITLE,
STREAM_A.RAW_DATA:"rating" AS RATING
FROM
NETFLIX_SHOWS RIGHT OUTER JOIN STREAM_A
ON NETFLIX_SHOWS.ID = STREAM_A.RAW_DATA:"show_id"
WHERE
STREAM_A.RAW_DATA:"rating" IS NOT NULL
);
SELECT * FROM STREAM_A;
SELECT * FROM STREAM_B;
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(' { "show_id": 1, "rating": 9 } ');
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(' { "show_id": 2, "rating": 8 } ');
SELECT * FROM STREAM_A;
SELECT * FROM STREAM_B;
INSERT INTO NETFLIX_RATINGS(SHOW_ID, SHOW_TITLE, RATING) (
SELECT
NETFLIX_SHOWS.ID AS SHOW_ID,
NETFLIX_SHOWS.TITLE AS SHOW_TITLE,
STREAM_A.RAW_DATA:"rating" AS RATING
FROM
NETFLIX_SHOWS RIGHT OUTER JOIN STREAM_A
ON NETFLIX_SHOWS.ID = STREAM_A.RAW_DATA:"show_id"
WHERE
STREAM_A.RAW_DATA:"rating" IS NOT NULL
);
SELECT * FROM STREAM_A;
SELECT * FROM STREAM_B;
DELETE FROM NETFLIX_RATING_EVENTS WHERE RAW_DATA:"rating" IS NOT NULL;
SELECT * FROM STREAM_A;
SELECT * FROM STREAM_B;
//=============================================================================
//=============================================================================
// Automate stream ingestion with a task
//=============================================================================
CREATE OR REPLACE TASK NETFLIX_RATINGS_EVENT_PROCESSOR
WAREHOUSE = <YOUR_WAREHOUSE> -- Add your warehouse here
SCHEDULE = 'USING CRON * * * * * America/Chicago' // process new records every minute
WHEN
SYSTEM$STREAM_HAS_DATA('STREAM_A')
AS
INSERT INTO NETFLIX_RATINGS(SHOW_ID, SHOW_TITLE, RATING) (
SELECT
NETFLIX_SHOWS.ID AS SHOW_ID,
NETFLIX_SHOWS.TITLE AS SHOW_TITLE,
STREAM_A.RAW_DATA:"rating" AS RATING
FROM
NETFLIX_SHOWS RIGHT OUTER JOIN STREAM_A
ON NETFLIX_SHOWS.ID = STREAM_A.RAW_DATA:"show_id"
WHERE
STREAM_A.RAW_DATA:"rating" IS NOT NULL
);
// Tasks are suspended by default. Resume the task so it will run on schedule
ALTER TASK NETFLIX_RATINGS_EVENT_PROCESSOR RESUME;
// add new events
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(' { "show_id": 3, "rating": 8 } ');
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(' { "show_id": 4, "rating": 8 } ');
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(' { "show_id": 5, "rating": 7 } ');
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(' { "show_id": 6, "rating": 9 } ');
INSERT INTO NETFLIX_RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(' { "platform": 9.75} ');
// Find the best show on netflix
SELECT
SHOW_TITLE,
AVG(RATING) AS AVG_RATING
FROM
NETFLIX_RATINGS
GROUP BY
SHOW_TITLE
ORDER BY
AVG_RATING DESC
LIMIT 1;
//=============================================================================
//=============================================================================
// Cleanup
//=============================================================================
DROP TASK IF EXISTS NETFLIX_RATINGS_EVENT_PROCESSOR;
DROP TABLE IF EXISTS NETFLIX_SHOWS;
DROP TABLE IF EXISTS NETFLIX_RATING_EVENTS;
DROP TABLE IF EXISTS NETFLIX_RATINGS;
DROP STREAM IF EXISTS STREAM_A;
DROP STREAM IF EXISTS STREAM_B;
DROP SCHEMA IF EXISTS NETFLIX_STREAMS_AND_TASKS;
//=============================================================================
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment