helm repo add streamnative https://charts.streamnative.io
helm repo add banzaicloud-stable https://kubernetes-charts.banzaicloud.com
helm repo add jetstack https://charts.jetstack.io
helm repo update
kubectl create namespace sn-system
export NAMESPACE=sn-system
helm upgrade --install vault-operator banzaicloud-stable/vault-operator -n $NAMESPACE
helm upgrade --install cert-manager jetstack/cert-manager -n $NAMESPACE --set installCRDs=true
helm upgrade --install pulsar-operator streamnative/pulsar-operator -n $NAMESPACE
helm upgrade --install function-mesh streamnative/function-mesh-operator -n $NAMESPACE
kubectl get po -n $NAMESPACE
If you can't access the original image using in above operators, you can use your own image.
helm inspect values banzaicloud-stable/vault-operator > value_vault_operator.yaml
Change below image settings in value_vault_operator.yaml
image:
bankVaultsRepository: ghcr.io/banzaicloud/bank-vaults
repository: ghcr.io/banzaicloud/vault-operator
# tag: ""
pullPolicy: IfNotPresent
imagePullSecrets: [] # global.imagePullSecrets is also supported
For example, If you wish to use image in docker hub, you can change as below.
image:
bankVaultsRepository: banzaicloud/bank-vaults
repository: banzaicloud/vault-operator
# tag: ""
pullPolicy: IfNotPresent
imagePullSecrets: [] # global.imagePullSecrets is also supported
Then install vault operator with below command.
helm upgrade --install -f value_vault_operator.yaml vault-operator banzaicloud-stable/vault-operator -n sn-system
helm inspect values jetstack/cert-manager > value_cert-manager.yaml
Change below image settings in value_cert-manager.yaml
image:
repository: quay.io/jetstack/cert-manager-controller
# You can manage a registry with
# registry: quay.io
# repository: jetstack/cert-manager-controller
# Override the image tag to deploy by setting this variable.
# If no value is set, the chart's appVersion will be used.
# tag: canary
# Setting a digest will override any tag
# digest: sha256:0e072dddd1f7f8fc8909a2ca6f65e76c5f0d2fcfb8be47935ae3457e8bbceb20
pullPolicy: IfNotPresent
image:
repository: quay.io/jetstack/cert-manager-webhook
# You can manage a registry with
# registry: quay.io
# repository: jetstack/cert-manager-webhook
# Override the image tag to deploy by setting this variable.
# If no value is set, the chart's appVersion will be used.
# tag: canary
# Setting a digest will override any tag
# digest: sha256:0e072dddd1f7f8fc8909a2ca6f65e76c5f0d2fcfb8be47935ae3457e8bbceb20
pullPolicy: IfNotPresent
image:
repository: quay.io/jetstack/cert-manager-cainjector
# You can manage a registry with
# registry: quay.io
# repository: jetstack/cert-manager-cainjector
# Override the image tag to deploy by setting this variable.
# If no value is set, the chart's appVersion will be used.
# tag: canary
# Setting a digest will override any tag
# digest: sha256:0e072dddd1f7f8fc8909a2ca6f65e76c5f0d2fcfb8be47935ae3457e8bbceb20
pullPolicy: IfNotPresent
For example, If you wish to use image in docker hub, you can change as below.
image:
repository: streamnative/cert-manager-controller
# You can manage a registry with
# registry: quay.io
# repository: jetstack/cert-manager-controller
# Override the image tag to deploy by setting this variable.
# If no value is set, the chart's appVersion will be used.
tag: v1.4.0
# Setting a digest will override any tag
# digest: sha256:0e072dddd1f7f8fc8909a2ca6f65e76c5f0d2fcfb8be47935ae3457e8bbceb20
pullPolicy: IfNotPresent
image:
repository: streamnative/cert-manager-webhook
# You can manage a registry with
# registry: quay.io
# repository: jetstack/cert-manager-webhook
# Override the image tag to deploy by setting this variable.
# If no value is set, the chart's appVersion will be used.
tag: v1.4.0
# Setting a digest will override any tag
# digest: sha256:0e072dddd1f7f8fc8909a2ca6f65e76c5f0d2fcfb8be47935ae3457e8bbceb20
pullPolicy: IfNotPresent
image:
repository: streamnative/cert-manager-cainjector
# You can manage a registry with
# registry: quay.io
# repository: jetstack/cert-manager-cainjector
# Override the image tag to deploy by setting this variable.
# If no value is set, the chart's appVersion will be used.
tag: v1.4.0
# Setting a digest will override any tag
# digest: sha256:0e072dddd1f7f8fc8909a2ca6f65e76c5f0d2fcfb8be47935ae3457e8bbceb20
pullPolicy: IfNotPresent
Then install cert manager with below command.
helm upgrade --install -f value_cert-manager.yaml cert-manager jetstack/cert-manager -n sn-system --set installCRDs=true
helm inspect values streamnative/pulsar-operator > value_pulsar-operator.yaml
Change below image settings in value_pulsar-operator.yaml
images:
zookeeper:
repository: streamnative/zookeeper-operator
tag: v0.7.0-rc5
pullPolicy: IfNotPresent
bookkeeper:
repository: streamnative/bookkeeper-operator
tag: v0.6.12
pullPolicy: IfNotPresent
pulsar:
repository: streamnative/pulsar-operator
tag: 0.7.0-rc6
pullPolicy: IfNotPresent
Then install cert manager with below command.
helm upgrade --install pulsar-operator -f value_pulsar-operator.yaml streamnative/pulsar-operator -n sn-system
helm inspect values streamnative/function-mesh-operator > value_function-mesh-operator.yaml
Change below image settings in value_function-mesh-operator.yaml
operatorImage: streamnative/function-mesh:v0.1.6-rc1
Then install cert manager with below command.
helm upgrade --install function-mesh -f value_function-mesh-operator.yaml streamnative/function-mesh-operator -n sn-system
- Change
demo.yamlto what you expect for you cluster
wget https://raw.githubusercontent.com/streamnative/examples/master/platform/values_cluster.yaml -O demo.yaml
- Create namespace
kubectl create namespace pulsar-demo
- Create demo cluster using helm command
# online version
helm install -f demo.yaml demo streamnative/sn-platform --set initialize=true
# offline version
helm install -f demo.yaml demo $PULSAR_CHART/ --set initialize=true
kubectl get po -n pulsar-demo
kubectl port-forward demo-sn-platform-streamnative-console-0 9527:9527 -n pulsar-demo
- visit http://localhost:9527 and the default username is admin. Get password for admin with following command.
kubectl get secret demo-sn-platform-vault-console-admin-passwd -o=jsonpath='{.data.password}' -n pulsar-demo | base64 --decode; echo
- create a new service account on streamnative console and copy its token
kubectl port-forward svc/demo-sn-platform-grafana 3000 -n pulsar-demo
kubectl port-forward service/demo-sn-platform-proxy-headless 6650 -n pulsar-demo
kubectl port-forward service/demo-sn-platform-proxy-headless 8080 -n pulsar-demo
- Change directory your PULSAR_HOME and change below values of
conf/client.conf
authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams=token:[YOUR_SERVICE_ACCOUNT_TOKEN]
- Start pulsar consumer
bin/pulsar-client consume public/default/test-topic-1 -s pulsar-demo -n 0
- Start pulsar producer
bin/pulsar-client produce public/default/test-topic-1 -m test-message -n 1000 -r 5
- Check if messages are produces and consumed correctly
- open streamnative console to discover producers,consumers & peek messages
kubectl exec -n pulsar-demo -it demo-sn-platform-toolset-0 -- bash
bin/pulsar-admin clusters list
bin/pulsar-client produce public/default/test-topic-1 -m test-message -n 1
export TOKEN=ZmRkZmVmNmItOWFlMi01ZjU0LTBlMjgtMjk4NmUyZmM2ZDJlOmQ1NTYxZmIyLWU4MGItNmYyNi1hMzQ0LTA1M2I3MmFhYTY3MA==
bin/pulsarctl \
--admin-service-url http://demo-sn-platform-broker:8080 \
--token "$TOKEN" \
clusters list
kubectl run kafka --rm -it -n pulsar-demo --image bitnami/kafka -- bash
export TOKEN=NDkzNjVkOTctYWY0NS0xYjNiLWI1NGYtNTgyZmYxOTcwOGIzOjNkNjdhMDViLTA1MzgtYzVlZi1mYzI4LWMxYzEyNmIyYmVmYQ==
kafka-console-producer.sh \
--producer-property security.protocol=SASL_PLAINTEXT \
--producer-property sasl.mechanism=PLAIN \
--producer-property 'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="public/default" password="token:'$TOKEN'";' \
--broker-list demo-sn-platform-broker:9092 --topic test-kop-1
kubectl exec -ti -n pulsar-demo kafka -- bash
kafka-console-consumer.sh \
--consumer-property security.protocol=SASL_PLAINTEXT \
--consumer-property sasl.mechanism=PLAIN \
--consumer-property 'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="public/default" password="token:'$TOKEN'";' \
--bootstrap-server demo-sn-platform-broker:9092 --topic test-kop-1
kubectl exec -n pulsar-demo -it demo-sn-platform-toolset-0 -- bash
bin/pulsar-client consume public/default/test-kop-1 -s pulsar-demo -n 0
- Change replica count of broker/proxy/bookie/zookeeper
- Run following command to apply change
# online version
helm upgrade -f demo.yaml demo streamnative/sn-platform
# offline version
helm upgrade -f demo.yaml demo $PULSAR_CHART/
kubectl get statefulset -n pulsar-demo
kubectl rollout restart statefulset/demo-sn-platform-broker -n pulsar-demo
kubectl rollout status statefulset/demo-sn-platform-broker -n pulsar-demo
watch 'kubectl get po -n pulsar-demo|grep broker'
TODO: only one pod restart ? why?
Add config under broker.configData node in demo.yaml
# For audit log
PULSAR_PREFIX_brokerInterceptors: "audit-log"
PULSAR_PREFIX_brokerInterceptorsDirectory: "./interceptors"
PULSAR_PREFIX_snAuditLogConfig: >
{"defaultTopics":{"allowed":"persistent://sn/system/audit_log_all","denied":"persistent://sn/system/audit_log_all"}}
Upgrade cluster
helm upgrade -f demo.yaml demo $PULSAR_CHART/
Create a new namespace under public tenant
kubectl exec -n pulsar-demo -it demo-sn-platform-toolset-0 -- bash
bin/pulsar-admin namespaces create public/audit_log
Check audit log in topic persistent://sn/system/audit_log_all
bin/pulsar-admin topics peek-messages -n 1 -s audit persistent://sn/system/audit_log_allowed
bin/pulsar-admin topics peek-messages -n 1 -s audit persistent://sn/system/audit_log_denied
There will be an audit log like following one.
{"id":"11c5296d-bf17-431a-80be-79ba66ba8a35","specVersion":"0.1","category":"Management","time":"2021-06-15T04:58:41.710Z","eventType":"CreateNamespace","resourceInfo":{"resourceType":"Namespace","cluster":"demo-sn-platform","tenant":"public","namespace":"audit_log"},"authenticationInfo":{"role":"admin"},"authorizationInfo":{"granted":true,"superUserAuthorization":true},"requestInfo":{"metadata":{"clientAddress":"10.225.14.43","uri":"/admin/v2/namespaces/public/audit_log","method":"PUT"}},"responseInfo":{"responseType":"SUCCESS","responseCode":204}}
StreamNative Platform provide ingress feature to expose following service:
-
Pulsar Proxy
-
StreamNative Console
-
Grafana
-
Pulsar Cluster -> Pulsar Proxy -> NodePort / LoadBalancer
-
StreamNative Console -> Nginx Ingress Controller
-
Grafana -> Nginx Ingress Controller
To enable ingress, you can change following config in your helm values file.
## Ingresses for exposing Pulsar services
ingress:
## Ingresses for exposing pulsar service publicly
proxy:
enabled: true
tls:
# If you enable proxy tls, you should enable this too.
enabled: true
annotations:
# If you use aws lb, recommend to add this
service.beta.kubernetes.io/aws-load-balancer-type: nlb
## Ingresses for exposing monitoring/management services publicly
controller:
enabled: true
annotations:
# If you use aws lb, recommend to add this
service.beta.kubernetes.io/aws-load-balancer-type: nlb
control_center:
enabled: true
# Set external domain of the load balancer of ingress controller
# external_domain:
# external_domain_scheme: https://
- Apply change
helm upgrade -f demo.yaml demo $PULSAR_CHART/
Get proxy-ingress access url/ip
kubectl get svc/demo-sn-platform-proxy-ingress -n pulsar-demo
Then you can access pulsar cluster through proxy-ingress HOST/IP. The relative port should be 8080/6650 for non-tls access and 443/6651 for tls access.
- Change conf/client.conf to connect to proxy
webServiceUrl=http://[EXTERNAL_IP]:8080
brokerServiceUrl=pulsar://[EXTERNAL_IP]:6650/
bin/pulsar-admin clusters list
bin/pulsar-client produce public/default/test-topic-1 -m test-message -n 10
Get nginx-ingress
kubectl get svc/demo-sn-platform-nginx-ingress-controller -n pulsar-demo
You can access streamnative console and grafana through nginx-ingress-controller HOST/IP. The relative port is 80.
- StreamNative Console URL is http://[nginx-ingress-HOST]/
- Grafana URL is http://[nginx-ingress-HOST]/grafana
-
Generate the cert files following this instruction
- It is better on your linux server for using right openssl version.
-
Create secret
kubectl create secret generic demo-sn-platform-proxy-tls -n pulsar-demo \
--from-file=tls.crt=$(PWD)/cert/broker.cert.pem \
--from-file=tls.key=$(PWD)/cert/broker.key-pk8.pem \
--from-file=ca.crt=$(PWD)/cert/ca.cert.pem
- Enable TLS on PulsarProxy
tls:
enabled: true
proxy:
enabled: true
proxy:
tlsSecretName: "demo-sn-platform-proxy-tls"
ingress:
proxy:
enabled: true
tls:
enabled: true
- Apply Change
helm upgrade -f demo.yaml demo $PULSAR_CHART/
- Get proxy URL from proxy-ingress service
kubectl get svc/demo-sn-platform-proxy-ingress -n pulsar-demo
- Change your client config
config/client.conf
webServiceUrl=https://[PROXY_INGRESS_ADDRESS]/
brokerServiceUrl=pulsar+ssl://[PROXY_INGRESS_ADDRESS]:6651/
tlsAllowInsecureConnection=false
tlsEnableHostnameVerification=false
tlsTrustCertsFilePath=[PATH_TO]/ca.cert.pem
- Use pulsar-client to produce and consume data to check if all works well
bin/pulsar-client produce public/default/test-topic-1 -m test-message -n 10
- https://istio.io/latest/docs/setup/install/istioctl/
curl -L https://istio.io/downloadIstio | sh -
cd istio-*
wget https://raw.githubusercontent.com/streamnative/examples/master/platform/istio.yaml
bin/istioctl install -y -f istio.yaml
This will expose
9093port for istio service.
- https://github.com/streamnative/kop/blob/master/docs/security.md#ssl-connection
Note: You must set the same password for server.keystore.jks and server.truststore.jks currently.
kubectl create secret generic kop-secret -n pulsar-demo --from-file=keystore.jks=$(PWD)/cert/server.keystore.jks --from-file=truststore.jks=$(PWD)/cert/server.truststore.jks
kubectl create secret generic kop-keystore-password --from-literal=password=passwd -n pulsar-demo
- Change config
advertisedDomain: "kop.sn.dev"
kop:
enabled: true
tls:
enabled: true
# create a secret with keystore.jks and truststore.jks for kop tls security
certSecretName: "kop-secret"
# create a secret for keystore and truststore cert
passwordSecretRef:
key: password
name: kop-keystore-password
istio:
gateway:
selector:
# set your istio gateway label here
istio: "ingressgateway"
- Apply
helm upgrade ...
- Create a kop service account and kop tenant with kafka namespace
- Grant produce/consume permission to
koprole onkop/kafkanamespace
bin/pulsar-admin namespaces grant-permission kop/kafka --actions produce,consume --role kop
- Get External IP of istio gateway service
kubectl get svc/istio-ingressgateway -n istio-system
- Set wildcard dns resolution for
advertisedDomainindemo.yamlto the above external IP. If you don't have one dns server, you can set relative settings in your local/etc/hostslike below.[External IP] kop.sn.dev demo-sn-platform-broker-0.kop.sn.dev demo-sn-platform-broker-1.kop.sn.dev demo-sn-platform-broker-2.kop.sn.dev - Customize your kafka client config
client-ssl.properties
security.protocol=SASL_SSL
ssl.truststore.location=client.truststore.jks
ssl.truststore.password=client
ssl.endpoint.identification.algorithm=
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kop/kafka" password="token:[YOUR_TOKEN]";
We use client cert in
certdirectory.
- Start kafka consumer
cd [KAFKA_HOME]
bin/kafka-topics.sh --bootstrap-server kop.sn.dev:9093 --command-config client-ssl.properties --topic test-pt20 --create --partitions 20 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server messaging.snp.aws.streamnative.dev:9093 --command-config client-public.properties --topic test-pt20 --create --partitions 20 --replication-factor 1
bin/kafka-console-consumer.sh --bootstrap-server kop.sn.dev:9093 --topic test-pt20 --consumer.config client-ssl.properties
- Start kafka producer
bin/kafka-console-producer.sh --broker-list kop.sn.dev:9093 --topic test-pt20 --producer.config client-ssl.properties
- Create some messages and check if kafka consumer prints them out.
The messages will be routed to kop/kafka/test-pt20 as we specify the username to kop/kafka.
- List available sources and sinks
kubectl exec -n pulsar-demo -it demo-sn-platform-toolset-0 -- bash
bin/pulsar-admin sinks available-sinks
bin/pulsar-admin sources available-sources
- List current sources ans there should be no sources now
bin/pulsar-admin sources list
- Create one source named data-generator which will produce sample data to pulsar topic
bin/pulsar-admin sources create --name data-generator-source --source-type data-generator --destination-topic-name persistent://public/default/random-data-topic --custom-runtime-options '{"outputTypeClassName": "org.apache.pulsar.io.datagenerator.Person"}' --source-config '{"sleepBetweenMessages": "5000"}'
# sink demo
bin/pulsar-admin sinks create --name sqs-sink-demo --sink-type sqs --inputs persistent://public/default/random-data-topic --sink-config '{"awsCredentialPluginName":"","awsCredentialPluginParam":"{\"accessKey\": \"AKIASEBAUITMXLJG5YGM\",\"secretKey\": \"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\"}","awsEndpoint":"https://sqs.cn-north-1.amazonaws.com.cn","awsRegion":"cn-north-1","queueName":"sqs-connector-test"}' --classname org.apache.pulsar.ecosystem.io.sqs.SQSSink --auto-ack true
- List source list and get detailed info
bin/pulsar-admin sources list
bin/pulsar-admin sources get --name data-generator-source
bin/pulsar-admin sources status --name data-generator-source
bin/pulsar-admin sinks list
bin/pulsar-admin sinks get --name sqs-sink-demo
bin/pulsar-admin sinks status --name sqs-sink-demo
- Find resources in k8s
kubectl get sources --all-namespaces
kubectl get sources data-generator-source-a8b3aa24 -n pulsar-demo -o yaml
- Check
random-*topic in pulsar - Delete source
bin/pulsar-admin sources delete --name data-generator-source
bin/pulsar-admin sources list
bin/pulsar-admin sinks delete --name sqs-005
bin/pulsar-admin sinks list
- Uninstall cluster
helm uninstall demo
kubectl delete ns pulsar-demo
- Uninstall StreamNative Platform
helm list -n sn-system
helm uninstall cert-manager -n sn-system
helm uninstall function-mesh -n sn-system
helm uninstall pulsar-operator -n sn-system
helm uninstall vault-operator -n sn-system
- Uninstall istio
bin/istioctl x uninstall --purge -y