Skip to content

Instantly share code, notes, and snippets.

@rockybean
Created August 3, 2021 03:10
Show Gist options
  • Select an option

  • Save rockybean/c6de04d67cffd00bfa856e4c444e16f9 to your computer and use it in GitHub Desktop.

Select an option

Save rockybean/c6de04d67cffd00bfa856e4c444e16f9 to your computer and use it in GitHub Desktop.
StreamNative Platform Demo Script

Doc

Install StreamNative Platform

helm repo add streamnative https://charts.streamnative.io
helm repo add banzaicloud-stable https://kubernetes-charts.banzaicloud.com
helm repo add jetstack https://charts.jetstack.io
helm repo update

kubectl create namespace sn-system
export NAMESPACE=sn-system
helm upgrade --install vault-operator banzaicloud-stable/vault-operator -n $NAMESPACE

helm upgrade --install cert-manager jetstack/cert-manager -n $NAMESPACE --set installCRDs=true

helm upgrade --install pulsar-operator streamnative/pulsar-operator -n $NAMESPACE

helm upgrade --install function-mesh streamnative/function-mesh-operator -n $NAMESPACE 

kubectl get po -n $NAMESPACE

Install by custom image

If you can't access the original image using in above operators, you can use your own image.

vault operator

helm inspect values banzaicloud-stable/vault-operator  > value_vault_operator.yaml

Change below image settings in value_vault_operator.yaml

image:
  bankVaultsRepository: ghcr.io/banzaicloud/bank-vaults
  repository: ghcr.io/banzaicloud/vault-operator
  # tag: ""
  pullPolicy: IfNotPresent
  imagePullSecrets: []  # global.imagePullSecrets is also supported

For example, If you wish to use image in docker hub, you can change as below.

image:
  bankVaultsRepository: banzaicloud/bank-vaults
  repository: banzaicloud/vault-operator
  # tag: ""
  pullPolicy: IfNotPresent
  imagePullSecrets: []  # global.imagePullSecrets is also supported

Then install vault operator with below command.

helm upgrade --install -f value_vault_operator.yaml vault-operator banzaicloud-stable/vault-operator -n sn-system

cert-manager

helm inspect values jetstack/cert-manager  > value_cert-manager.yaml

Change below image settings in value_cert-manager.yaml

image:
  repository: quay.io/jetstack/cert-manager-controller
  # You can manage a registry with
  # registry: quay.io
  # repository: jetstack/cert-manager-controller

  # Override the image tag to deploy by setting this variable.
  # If no value is set, the chart's appVersion will be used.
  # tag: canary

  # Setting a digest will override any tag
  # digest: sha256:0e072dddd1f7f8fc8909a2ca6f65e76c5f0d2fcfb8be47935ae3457e8bbceb20
  pullPolicy: IfNotPresent

  image:
    repository: quay.io/jetstack/cert-manager-webhook
    # You can manage a registry with
    # registry: quay.io
    # repository: jetstack/cert-manager-webhook

    # Override the image tag to deploy by setting this variable.
    # If no value is set, the chart's appVersion will be used.
    # tag: canary

    # Setting a digest will override any tag
    # digest: sha256:0e072dddd1f7f8fc8909a2ca6f65e76c5f0d2fcfb8be47935ae3457e8bbceb20

    pullPolicy: IfNotPresent

  image:
    repository: quay.io/jetstack/cert-manager-cainjector
    # You can manage a registry with
    # registry: quay.io
    # repository: jetstack/cert-manager-cainjector

    # Override the image tag to deploy by setting this variable.
    # If no value is set, the chart's appVersion will be used.
    # tag: canary

    # Setting a digest will override any tag
    # digest: sha256:0e072dddd1f7f8fc8909a2ca6f65e76c5f0d2fcfb8be47935ae3457e8bbceb20

    pullPolicy: IfNotPresent

For example, If you wish to use image in docker hub, you can change as below.

image:
  repository: streamnative/cert-manager-controller
  # You can manage a registry with
  # registry: quay.io
  # repository: jetstack/cert-manager-controller

  # Override the image tag to deploy by setting this variable.
  # If no value is set, the chart's appVersion will be used.
  tag: v1.4.0

  # Setting a digest will override any tag
  # digest: sha256:0e072dddd1f7f8fc8909a2ca6f65e76c5f0d2fcfb8be47935ae3457e8bbceb20
  pullPolicy: IfNotPresent

  image:
    repository: streamnative/cert-manager-webhook
    # You can manage a registry with
    # registry: quay.io
    # repository: jetstack/cert-manager-webhook

    # Override the image tag to deploy by setting this variable.
    # If no value is set, the chart's appVersion will be used.
    tag: v1.4.0

    # Setting a digest will override any tag
    # digest: sha256:0e072dddd1f7f8fc8909a2ca6f65e76c5f0d2fcfb8be47935ae3457e8bbceb20

    pullPolicy: IfNotPresent

  image:
    repository: streamnative/cert-manager-cainjector
    # You can manage a registry with
    # registry: quay.io
    # repository: jetstack/cert-manager-cainjector

    # Override the image tag to deploy by setting this variable.
    # If no value is set, the chart's appVersion will be used.
    tag: v1.4.0

    # Setting a digest will override any tag
    # digest: sha256:0e072dddd1f7f8fc8909a2ca6f65e76c5f0d2fcfb8be47935ae3457e8bbceb20

    pullPolicy: IfNotPresent

Then install cert manager with below command.

helm upgrade --install -f value_cert-manager.yaml  cert-manager jetstack/cert-manager -n sn-system --set installCRDs=true

pulsar operator

helm inspect values streamnative/pulsar-operator  > value_pulsar-operator.yaml

Change below image settings in value_pulsar-operator.yaml

images:
  zookeeper:
    repository: streamnative/zookeeper-operator
    tag: v0.7.0-rc5
    pullPolicy: IfNotPresent
  bookkeeper:
    repository: streamnative/bookkeeper-operator
    tag: v0.6.12
    pullPolicy: IfNotPresent
  pulsar:
    repository: streamnative/pulsar-operator
    tag: 0.7.0-rc6
    pullPolicy: IfNotPresent

Then install cert manager with below command.

helm upgrade --install pulsar-operator -f value_pulsar-operator.yaml streamnative/pulsar-operator -n sn-system

function mesh

helm inspect values streamnative/function-mesh-operator  > value_function-mesh-operator.yaml

Change below image settings in value_function-mesh-operator.yaml

operatorImage: streamnative/function-mesh:v0.1.6-rc1

Then install cert manager with below command.

helm upgrade --install function-mesh -f value_function-mesh-operator.yaml streamnative/function-mesh-operator -n sn-system

Deploy your own cluster

  • Change demo.yaml to what you expect for you cluster
wget https://raw.githubusercontent.com/streamnative/examples/master/platform/values_cluster.yaml -O demo.yaml
  • Create namespace
kubectl create namespace pulsar-demo
  • Create demo cluster using helm command
# online version
helm install  -f demo.yaml demo streamnative/sn-platform  --set initialize=true

# offline version
helm install  -f demo.yaml demo $PULSAR_CHART/  --set initialize=true 

kubectl get po -n pulsar-demo

Access service through port-forward

open streamnative console

kubectl port-forward demo-sn-platform-streamnative-console-0 9527:9527 -n pulsar-demo
  • visit http://localhost:9527 and the default username is admin. Get password for admin with following command.
kubectl get secret demo-sn-platform-vault-console-admin-passwd -o=jsonpath='{.data.password}' -n pulsar-demo | base64 --decode; echo
  • create a new service account on streamnative console and copy its token

open grafana to show dashboards

kubectl port-forward svc/demo-sn-platform-grafana 3000 -n pulsar-demo

expose pulsar web service and proxy service to local

kubectl port-forward service/demo-sn-platform-proxy-headless 6650 -n pulsar-demo
kubectl port-forward service/demo-sn-platform-proxy-headless 8080 -n pulsar-demo

Use pulsar client to produce and consume with token authc and show authz demo

  • Change directory your PULSAR_HOME and change below values of conf/client.conf
authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams=token:[YOUR_SERVICE_ACCOUNT_TOKEN]
  • Start pulsar consumer
bin/pulsar-client consume public/default/test-topic-1 -s pulsar-demo -n 0
  • Start pulsar producer
bin/pulsar-client produce public/default/test-topic-1 -m test-message -n 1000 -r 5
  • Check if messages are produces and consumed correctly
  • open streamnative console to discover producers,consumers & peek messages

Use toolset pod to do the same things easily inside k8s

kubectl exec -n pulsar-demo -it demo-sn-platform-toolset-0 -- bash

bin/pulsar-admin clusters list

bin/pulsar-client produce public/default/test-topic-1 -m test-message -n 1

export TOKEN=ZmRkZmVmNmItOWFlMi01ZjU0LTBlMjgtMjk4NmUyZmM2ZDJlOmQ1NTYxZmIyLWU4MGItNmYyNi1hMzQ0LTA1M2I3MmFhYTY3MA==
bin/pulsarctl \
--admin-service-url http://demo-sn-platform-broker:8080 \
--token "$TOKEN" \
clusters list


Use kafka client to produce/consume inside k8s

kubectl run kafka --rm -it -n pulsar-demo --image bitnami/kafka -- bash


export TOKEN=NDkzNjVkOTctYWY0NS0xYjNiLWI1NGYtNTgyZmYxOTcwOGIzOjNkNjdhMDViLTA1MzgtYzVlZi1mYzI4LWMxYzEyNmIyYmVmYQ==

kafka-console-producer.sh \
--producer-property security.protocol=SASL_PLAINTEXT \
--producer-property sasl.mechanism=PLAIN \
--producer-property 'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="public/default" password="token:'$TOKEN'";' \
--broker-list demo-sn-platform-broker:9092 --topic test-kop-1 


kubectl exec -ti -n pulsar-demo kafka -- bash

kafka-console-consumer.sh \
--consumer-property security.protocol=SASL_PLAINTEXT \
--consumer-property sasl.mechanism=PLAIN \
--consumer-property 'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="public/default" password="token:'$TOKEN'";' \
--bootstrap-server demo-sn-platform-broker:9092 --topic test-kop-1 

Use pulsar client to consume data from kafka topic

kubectl exec -n pulsar-demo -it demo-sn-platform-toolset-0 -- bash

bin/pulsar-client consume public/default/test-kop-1 -s pulsar-demo -n 0

Scale up & down

  • Change replica count of broker/proxy/bookie/zookeeper
  • Run following command to apply change
# online version
helm upgrade  -f demo.yaml demo streamnative/sn-platform

# offline version
helm upgrade  -f demo.yaml demo $PULSAR_CHART/

Restart Broker

kubectl get statefulset -n pulsar-demo
kubectl rollout restart statefulset/demo-sn-platform-broker -n pulsar-demo
kubectl rollout status statefulset/demo-sn-platform-broker -n pulsar-demo
watch 'kubectl get po -n pulsar-demo|grep broker'

TODO: only one pod restart ? why?

Enable Audit Log on Pulsar Broker

Add config under broker.configData node in demo.yaml

    # For audit log
    PULSAR_PREFIX_brokerInterceptors: "audit-log"
    PULSAR_PREFIX_brokerInterceptorsDirectory: "./interceptors"
    PULSAR_PREFIX_snAuditLogConfig: >
      {"defaultTopics":{"allowed":"persistent://sn/system/audit_log_all","denied":"persistent://sn/system/audit_log_all"}}

Upgrade cluster

helm upgrade  -f demo.yaml demo $PULSAR_CHART/

Create a new namespace under public tenant

kubectl exec -n pulsar-demo -it demo-sn-platform-toolset-0 -- bash
bin/pulsar-admin namespaces create public/audit_log

Check audit log in topic persistent://sn/system/audit_log_all

bin/pulsar-admin topics peek-messages -n 1 -s audit persistent://sn/system/audit_log_allowed
bin/pulsar-admin topics peek-messages -n 1 -s audit persistent://sn/system/audit_log_denied

There will be an audit log like following one.

{"id":"11c5296d-bf17-431a-80be-79ba66ba8a35","specVersion":"0.1","category":"Management","time":"2021-06-15T04:58:41.710Z","eventType":"CreateNamespace","resourceInfo":{"resourceType":"Namespace","cluster":"demo-sn-platform","tenant":"public","namespace":"audit_log"},"authenticationInfo":{"role":"admin"},"authorizationInfo":{"granted":true,"superUserAuthorization":true},"requestInfo":{"metadata":{"clientAddress":"10.225.14.43","uri":"/admin/v2/namespaces/public/audit_log","method":"PUT"}},"responseInfo":{"responseType":"SUCCESS","responseCode":204}}

Expose service outside k8s

StreamNative Platform provide ingress feature to expose following service:

  • Pulsar Proxy

  • StreamNative Console

  • Grafana

  • Pulsar Cluster -> Pulsar Proxy -> NodePort / LoadBalancer

  • StreamNative Console -> Nginx Ingress Controller

  • Grafana -> Nginx Ingress Controller

To enable ingress, you can change following config in your helm values file.

## Ingresses for exposing Pulsar services
ingress:
  ## Ingresses for exposing pulsar service publicly
  proxy:
    enabled: true
    tls:
      # If you enable proxy tls, you should enable this too.
      enabled: true
    annotations:
      # If you use aws lb, recommend to add this
      service.beta.kubernetes.io/aws-load-balancer-type: nlb
  ## Ingresses for exposing monitoring/management services publicly
  controller:
    enabled: true
    annotations: 
      # If you use aws lb, recommend to add this
      service.beta.kubernetes.io/aws-load-balancer-type: nlb
  control_center:
    enabled: true
    # Set external domain of the load balancer of ingress controller
    # external_domain: 
    # external_domain_scheme: https://
  • Apply change
helm upgrade  -f demo.yaml demo $PULSAR_CHART/

Get proxy-ingress access url/ip

kubectl get svc/demo-sn-platform-proxy-ingress -n pulsar-demo

Then you can access pulsar cluster through proxy-ingress HOST/IP. The relative port should be 8080/6650 for non-tls access and 443/6651 for tls access.

  • Change conf/client.conf to connect to proxy
webServiceUrl=http://[EXTERNAL_IP]:8080
brokerServiceUrl=pulsar://[EXTERNAL_IP]:6650/
bin/pulsar-admin clusters list
bin/pulsar-client produce public/default/test-topic-1 -m test-message -n 10

Get nginx-ingress

kubectl get svc/demo-sn-platform-nginx-ingress-controller -n pulsar-demo

You can access streamnative console and grafana through nginx-ingress-controller HOST/IP. The relative port is 80.

  • StreamNative Console URL is http://[nginx-ingress-HOST]/
  • Grafana URL is http://[nginx-ingress-HOST]/grafana

Enable TLS on pulsar proxy

  • Generate the cert files following this instruction

    • It is better on your linux server for using right openssl version.
  • Create secret

kubectl create secret generic demo-sn-platform-proxy-tls -n pulsar-demo \
  --from-file=tls.crt=$(PWD)/cert/broker.cert.pem \
  --from-file=tls.key=$(PWD)/cert/broker.key-pk8.pem \
  --from-file=ca.crt=$(PWD)/cert/ca.cert.pem
  • Enable TLS on PulsarProxy
tls:
  enabled: true
  proxy:
    enabled: true
proxy:
  tlsSecretName: "demo-sn-platform-proxy-tls"

ingress:
  proxy:
    enabled: true
    tls:
      enabled: true
  • Apply Change
helm upgrade  -f demo.yaml demo $PULSAR_CHART/
  • Get proxy URL from proxy-ingress service
kubectl get svc/demo-sn-platform-proxy-ingress -n pulsar-demo
  • Change your client config config/client.conf
webServiceUrl=https://[PROXY_INGRESS_ADDRESS]/
brokerServiceUrl=pulsar+ssl://[PROXY_INGRESS_ADDRESS]:6651/
tlsAllowInsecureConnection=false
tlsEnableHostnameVerification=false
tlsTrustCertsFilePath=[PATH_TO]/ca.cert.pem
  • Use pulsar-client to produce and consume data to check if all works well
 bin/pulsar-client produce public/default/test-topic-1 -m test-message -n 10

Enable KoP Access outside k8s

Install istio

- https://istio.io/latest/docs/setup/install/istioctl/
curl -L https://istio.io/downloadIstio | sh -
cd istio-*
wget https://raw.githubusercontent.com/streamnative/examples/master/platform/istio.yaml
bin/istioctl install -y -f istio.yaml 

This will expose 9093 port for istio service.

Generate certificates for KoP if you wish to visit KoP outside k8s

-  https://github.com/streamnative/kop/blob/master/docs/security.md#ssl-connection

Note: You must set the same password for server.keystore.jks and server.truststore.jks currently.

Create secret for KoP certs

kubectl create secret generic kop-secret -n pulsar-demo --from-file=keystore.jks=$(PWD)/cert/server.keystore.jks --from-file=truststore.jks=$(PWD)/cert/server.truststore.jks

Create secret for KoP cert password

kubectl create secret generic kop-keystore-password --from-literal=password=passwd -n pulsar-demo

Enable KoP TLS on config and apply

  • Change config
  advertisedDomain: "kop.sn.dev"
  kop:
    enabled: true
    tls:
      enabled: true
      # create a secret with keystore.jks and truststore.jks for kop tls security
      certSecretName: "kop-secret"
      # create a secret for keystore and truststore cert
      passwordSecretRef:
        key: password
        name: kop-keystore-password
  istio:
    gateway:
      selector:
        # set your istio gateway label here
        istio: "ingressgateway"
  • Apply
helm upgrade ...

Test KoP Features

  • Create a kop service account and kop tenant with kafka namespace
  • Grant produce/consume permission to kop role on kop/kafka namespace
bin/pulsar-admin namespaces grant-permission kop/kafka --actions produce,consume --role kop
  • Get External IP of istio gateway service
kubectl get svc/istio-ingressgateway -n istio-system
  • Set wildcard dns resolution for advertisedDomain in demo.yaml to the above external IP. If you don't have one dns server, you can set relative settings in your local /etc/hosts like below.
    [External IP] kop.sn.dev demo-sn-platform-broker-0.kop.sn.dev demo-sn-platform-broker-1.kop.sn.dev demo-sn-platform-broker-2.kop.sn.dev
    
  • Customize your kafka client config client-ssl.properties
security.protocol=SASL_SSL
ssl.truststore.location=client.truststore.jks
ssl.truststore.password=client
ssl.endpoint.identification.algorithm=
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kop/kafka" password="token:[YOUR_TOKEN]";

We use client cert in cert directory.

  • Start kafka consumer
cd [KAFKA_HOME]
bin/kafka-topics.sh --bootstrap-server kop.sn.dev:9093 --command-config client-ssl.properties --topic test-pt20 --create --partitions 20 --replication-factor 1

bin/kafka-topics.sh --bootstrap-server messaging.snp.aws.streamnative.dev:9093 --command-config client-public.properties --topic test-pt20 --create --partitions 20 --replication-factor 1

bin/kafka-console-consumer.sh --bootstrap-server kop.sn.dev:9093 --topic test-pt20 --consumer.config client-ssl.properties
  • Start kafka producer
bin/kafka-console-producer.sh --broker-list kop.sn.dev:9093 --topic test-pt20 --producer.config client-ssl.properties
  • Create some messages and check if kafka consumer prints them out.

The messages will be routed to kop/kafka/test-pt20 as we specify the username to kop/kafka.

Try Function Mesh

  • List available sources and sinks
kubectl exec -n pulsar-demo -it demo-sn-platform-toolset-0 -- bash
bin/pulsar-admin sinks available-sinks
bin/pulsar-admin sources available-sources
  • List current sources ans there should be no sources now
bin/pulsar-admin sources list
  • Create one source named data-generator which will produce sample data to pulsar topic
bin/pulsar-admin sources create --name data-generator-source --source-type data-generator --destination-topic-name persistent://public/default/random-data-topic --custom-runtime-options '{"outputTypeClassName": "org.apache.pulsar.io.datagenerator.Person"}' --source-config '{"sleepBetweenMessages": "5000"}'


# sink demo
bin/pulsar-admin sinks create --name sqs-sink-demo --sink-type sqs --inputs persistent://public/default/random-data-topic --sink-config '{"awsCredentialPluginName":"","awsCredentialPluginParam":"{\"accessKey\": \"AKIASEBAUITMXLJG5YGM\",\"secretKey\": \"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\"}","awsEndpoint":"https://sqs.cn-north-1.amazonaws.com.cn","awsRegion":"cn-north-1","queueName":"sqs-connector-test"}' --classname org.apache.pulsar.ecosystem.io.sqs.SQSSink --auto-ack true

  • List source list and get detailed info
bin/pulsar-admin sources list
bin/pulsar-admin sources get --name data-generator-source
bin/pulsar-admin sources status --name data-generator-source


bin/pulsar-admin sinks list
bin/pulsar-admin sinks get --name sqs-sink-demo
bin/pulsar-admin sinks status --name sqs-sink-demo
  • Find resources in k8s
kubectl get sources --all-namespaces

kubectl get sources data-generator-source-a8b3aa24 -n pulsar-demo -o yaml
  • Check random-* topic in pulsar
  • Delete source
bin/pulsar-admin sources delete --name data-generator-source
bin/pulsar-admin sources list

bin/pulsar-admin sinks delete --name sqs-005
bin/pulsar-admin sinks list

6. Uninstall

  • Uninstall cluster
helm uninstall demo
kubectl delete ns pulsar-demo
  • Uninstall StreamNative Platform
helm list -n sn-system
helm uninstall cert-manager -n sn-system 
helm uninstall function-mesh -n sn-system
helm uninstall pulsar-operator -n sn-system
helm uninstall vault-operator -n sn-system
  • Uninstall istio
bin/istioctl x uninstall --purge -y
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment