-
Notifications
You must be signed in to change notification settings - Fork 49
/
Copy pathPascalZMQ.pas
2184 lines (1816 loc) · 61.1 KB
/
PascalZMQ.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
unit PascalZMQ;
{ Pascal version of the various CZMQ helpers }
{$DEFINE ZMQ_VERIFY_HASH}
{ Converting from ZMQ/CZMQ to PascalZMQ:
ZMQ/CZMQ --> PascalZMQ
-------------------------------------
zctx_new TZContext.Create
zctx_destroy TZContext.Free
zframe_new TZFrame.Initialize
zframe_destroy TZFrame.Free
zframe_dup TZFrame.Clone
zframe_size TZFrame.Size
zframe_data TZFrame.Data
TZFrame.ToBytes
zframe_eq TZFrame.Equals
zframe_strhex TZFrame.ToHexString
zmsg_new TZMessage.Create
zmsg_destroy TZMessage.Free
zmsg_dup TZMessage.Clone
zmsg_size TZMessage.FrameCount
zmsg_first TZMessage.First
TZMessage.Frames[0]
zmsg_next TZMessage.Next
TZMessage.Frames[I]
zmsg_prepend TZMessage.Push
(new) TZMessage.PushEmptyFrame
zmsg_pushmem TZMessage.PushInteger
TZMessage.PushSingle
TZMessage.PushDouble
TZMessage.PushEnum
TZMessage.PushString
TZMessage.PushBytes
TZMessage.PushProtocolBuffer
zmsg_pop TZMessage.Pop
TZMessage.PopInteger
TZMessage.PopSingle
TZMessage.PopDouble
TZMessage.PopEnum
TZMessage.PopString
TZMessage.PopBytes
TZMessage.PopProtocolBuffer
(new) TZMessage.Peek
TZMessage.PeekInteger
TZMessage.PeekSingle
TZMessage.PeekDouble
TZMessage.PeekEnum
TZMessage.PeekString
TZMessage.PeekBytes
TZMessage.PeekProtocolBuffer
zmsg_unwrap TZMessage.Unwrap
zmsg_send TZSocket.Send
zmsg_receive TZSocket.Receive
zsocket_new TZSocket.Create
zsocket_destroy TZSocket.Free
zsocket_bind TZSocket.Bind
zsocket_connect/zmq_connect TZSocket.Connect
zmq_disconnect TZSocket.Disconnect
zmq_poll TZSocket.Poll
zsocket_set_linger TZSocket.Linger
zsocket_set_curve_server TZSocket.IsCurveServer
zsocket_set_curve_serverkey TZSocket.CurveServerKey
zcert_new TZCertificate.Create()
zcert_load TZCertificate.Create(Filename)
zcert_destroy TZCertificate.Free
zcert_save TZCertificate.Save
zcert_apply TZCertificate.Apply
}
{$INCLUDE 'Grijjy.inc'}
interface
uses
System.Types,
System.Classes,
System.SysUtils,
System.SyncObjs,
System.Generics.Collections,
ZMQ.API;
type
{ Exception class for ZMQ errors }
EZMQError = class(Exception);
type
{ Types of sockets }
TZSocketType = (
Pair = ZMQ_PAIR,
Pub = ZMQ_PUB,
Sub = ZMQ_SUB,
Req = ZMQ_REQ,
Rep = ZMQ_REP,
Dealer = ZMQ_DEALER,
Router = ZMQ_ROUTER,
Pull = ZMQ_PULL,
Push = ZMQ_PUSH,
XPub = ZMQ_XPUB,
XSub = ZMQ_XSUB,
Stream = ZMQ_STREAM);
type
{ Poll events }
TZSocketPoll = (
Input = ZMQ_POLLIN,
Output = ZMQ_POLLOUT);
type
{ Poll results }
TZSocketPollResult = (Available, Timeout, Interrupted);
type
{ Type of public and secret keys for CURVE certificates }
TZCurveKey = array [0..31] of Byte;
type
{ ZMQ context.
Modeled after CZMQ zctx: wraps a ZMQ context and manages its sockets }
TZContext = class(TObject)
{$REGION 'Internal Declarations'}
private const
{ TODO : Make these configurable? }
SND_HWM = 10000; { High Water Mark for outbound messages. 0=unlimited }
RCV_HWM = 10000; { High Water Mark for inbound messages. 0=unlimited }
LINGER = 0;
private
FContext: Pointer;
FLock: TCriticalSection;
FSockets: TList<Pointer>;
FIsShadow: Boolean;
private
procedure CreateContext;
function CreateSocket(const AType: TZSocketType): Pointer;
procedure FreeSocket(const ASocket: Pointer);
{$ENDREGION 'Internal Declarations'}
public
{ Create a new context }
constructor Create;
{ Free the context }
destructor Destroy; override;
{ Creates a new shadow context based on this context }
function Shadow: TZContext;
end;
type
PZFrame = ^TZFrame;
{ A single message frame in a multi-part TZMessage. }
TZFrame = record
{$REGION 'Internal Declarations'}
private
FMsg: zmq_msg_t;
function GetSize: Integer; inline;
function GetData: Pointer; inline;
private
function Send(const ASocketHandle: Pointer; const AMoreFollows: Boolean): Boolean;
function Receive(const ASocketHandle: Pointer): Boolean; inline;
{$ENDREGION 'Internal Declarations'}
public
{ Creates an empty frame. }
class function Create: PZFrame; overload; static;
{ Creates a frame with existing data.
Parameters:
AData: the data of the frame. The frame will create a copy of the data,
so you can release the data afterwards.
ASize: the data size for the frame. }
class function Create(const AData; const ASize: Integer): PZFrame; overload; static;
{ Creates a frame with existing data.
Parameters:
AData: a byte array containing the data of the frame. The frame will
create a copy of the data, so you can release the data afterwards. }
class function Create(const AData: TBytes): PZFrame; overload; static; inline;
{ Destroys the frame (and any data it holds). Frames automatically get
destroyed when you send them (see TZSocket.Send). }
procedure Free;
{ Creates a copy of the frame.
Returns:
A newly allocated copy of the frame. You are responsible for freeing
it by either calling Free explicitly or sending the copy. }
function Clone: PZFrame;
{ Returns True if two frames have identical size and data }
function Equals(const AOther: PZFrame): Boolean;
{ Copies the data in the frame to an array of bytes.
Returns:
A byte array with the copy of the data in the frame. }
function ToBytes: TBytes;
{ Return the data encoded as a printable hex string. Useful for ZeroMQ
UUIDs. }
function ToHexString: String;
{ Returns the contents of the frame as a 4-byte integer value.
Raises an exception if the frame does not contain a 4-byte value. }
function AsInteger: Integer;
{ Returns the contents of the frame as a 4-byte single-precision
floating-point value.
Raises an exception if the frame does not contain a 4-byte value. }
function AsSingle: Single;
{ Returns the contents of the frame as a 8-byte double-precision
floating-point value.
Raises an exception if the frame does not contain a 8-byte value. }
function AsDouble: Double;
{ Returns the contents of the frame as a UTF-8 decoded Unicode string.
Raises an exception if the frame does not contain valid UTF-8 data. }
function AsString: String;
{ The size of the frame in bytes }
property Size: Integer read GetSize;
{ Pointer to the data in the frame }
property Data: Pointer read GetData;
end;
type
PZMessage = ^TZMessage;
{ A multi-part message. }
TZMessage = record
{$REGION 'Internal Declarations'}
private const
{ Maximum number of frames that can be stored inside a message without
having to dynamically allocate a list. Once this limit has been reached,
we switch to a dynamic list. The vast majority of message will be below
this limit, which gives a performance boost. The maximum number of frames
in any message used by the unit tests is 8. }
STATIC_FRAME_COUNT = 8;
private type
PPZFrame = ^PZFrame;
private
FStaticFrames: array [0..STATIC_FRAME_COUNT - 1] of PZFrame;
FFrames: PPZFrame; { Points to either FStaticFrames, or is dynamically allocated }
FCapacity: Integer;
FCount: Integer;
FCursor: Integer;
private
function GetFrame(const AIndex: Integer): PZFrame; inline;
private
function Send(const ASocketHandle: Pointer; const ASocketType: TZSocketType): Boolean;
function Receive(const ASocketHandle: Pointer; const ASocketType: TZSocketType): Boolean;
{$ENDREGION 'Internal Declarations'}
public
{ Creates a new empty message }
class function Create: PZMessage; static;
{ Destroys the message (and any frames it owns). Messages automatically get
destroyed when you send them (see TZSocket.Send). }
procedure Free;
{ Creates a copy of the message.
Returns:
A newly allocated copy of the message. You are responsible for freeing
it by either calling Free explicitly or sending the copy. }
function Clone: PZMessage;
{ Returns the first frame in the message, or nil if there are no frames.
Use in combination with Next to walk the frames in the message. }
function First: PZFrame;
{ Returns the next frame in the message, or nil if there are no frames left.
Use in combination with First to walk the frames in the message. }
function Next: PZFrame;
public
{ Pushing frames }
{ Pushes a frame to the @bold(front) of the message. The message becomes
owner of the frame and will destroy it when the message is sent or
destroyed.
Parameters:
AFrame: the frame to push onto the message. Cannot be nil.
AFrame will be set to nil after this call }
procedure Push(var AFrame: PZFrame);
{ Pushes an empty frame to the @bold(front) of the message. The message
becomes owner of the frame and will destroy it when the message is sent or
destroyed. Empty frames are used as delimiters by some topologies. }
procedure PushEmptyFrame;
{ Pushes a new frame to the @bold(front) of the message, with a 4-byte
integer value.
Parameters:
AValue: the value to push. }
procedure PushInteger(const AValue: Integer); inline;
{ Pushes a new frame to the @bold(front) of the message, with a 4-byte
single-precision floating-point value.
Parameters:
AValue: the value to push. }
procedure PushSingle(const AValue: Single); inline;
{ Pushes a new frame to the @bold(front) of the message, with a 8-byte
double-precision floating-point value.
Parameters:
AValue: the value to push. }
procedure PushDouble(const AValue: Double); inline;
{ Pushes a new frame to the @bold(front) of the message, with an
enumerated value.
Parameters:
AValue: the value to push. }
procedure PushEnum<T: record>(const AValue: T); overload; inline;
{ Pushes a new frame to the @bold(front) of the message, with an
enumerated value.
Parameters:
AEnumValue: the ordinal value of the enum to push. Use Ord() for this.
AEnumSize: the size of the enumeration type. Use SizeOf() for this. }
procedure PushEnum(const AEnumValue, AEnumSize: Integer); overload; inline;
{ Pushes a new frame to the @bold(front) of the message, with a Unicode
string value. The string will be encoded in UTF-8 format.
Parameters:
AValue: the value to push. }
procedure PushString(const AValue: String);
{ Pushes a new frame to the @bold(front) of the message, with a byte array.
Parameters:
AValue: the value to push. }
procedure PushBytes(const AValue: TBytes); inline;
{ Pushes a new frame to the @bold(front) of the message, with a given
memory buffer.
Parameters:
AValue: the memory buffer to push.
ASize: the size of the memory buffer }
procedure PushMemory(const AValue; const ASize: Integer); inline;
{ Pushes a new frame to the @bold(front) of the message, with a record in
Google Protocol Buffer format. The record type must be attributed/
registered for use with protocol buffers.
Parameters:
AValue: the value to push. }
procedure PushProtocolBuffer<T: record>(const AValue: T); overload;
procedure PushProtocolBuffer(const ARecordType: Pointer; const AValue); overload;
public
{ Popping frames }
{ Pops a frame of the @bold(front) of the message. The caller owns the frame
now and must destroy it when finished with it.
Returns:
The popped frame or nil if there are no frames left to pop. }
function Pop: PZFrame;
{ Pops a frame of the @bold(front) of the message and converts it to a
4-byte integer value. The popped frame will be destroyed.
Returns:
The integer value of the frame, or 0 if there are no frames left to pop.
Raises an exception if the frame does not contain a 4-byte value. }
function PopInteger: Integer;
{ Pops a frame of the @bold(front) of the message and converts it to a
4-byte single-precision floating-point value. The popped frame will be
destroyed.
Returns:
The floating-point value of the frame, or 0 if there are no frames left
to pop.
Raises an exception if the frame does not contain a 4-byte value. }
function PopSingle: Single;
{ Pops a frame of the @bold(front) of the message and converts it to a
8-byte double-precision floating-point value. The popped frame will be
destroyed.
Returns:
The floating-point value of the frame, or 0 if there are no frames left
to pop.
Raises an exception if the frame does not contain a 8-byte value. }
function PopDouble: Double;
{ Pops a frame of the @bold(front) of the message and converts it to an
enumerated value. The popped frame will be destroyed.
Returns:
The enum value of the frame, or the enum with ordinal value 0 if there
are no frames left to pop.
Raises an exception if the frame does not contain a value the size of T. }
function PopEnum<T: record>: T; overload;
{ Pops a frame of the @bold(front) of the message and converts it to an
enumerated value. The popped frame will be destroyed.
Parameters:
AEnumSize: the size of the enumeration type. Use SizeOf() for this.
Returns:
The ordinal enum value of the frame, or 0 if there are no frames left.
Raises an exception if the frame does not contain a value the size of
AEnumSize. }
function PopEnum(const AEnumSize: Integer): Integer; overload;
{ Pops a frame of the @bold(front) of the message and UTF-8 decodes it to a
Unicode string. The popped frame will be destroyed.
Returns:
The string value of the frame, or an empty string if there are no frames
left to pop.
Raises an exception if the frame does not contain valid UTF-8 data. }
function PopString: String;
{ Pops a frame of the @bold(front) of the message and returns its data as a
byte array. The popped frame will be destroyed.
Returns:
A byte array with the data of the frame, or nil if there are no frames
left to pop. }
function PopBytes: TBytes;
{ Pops a frame of the @bold(front) of the message and deserializes it into a
record in Google Protocol Buffer format. The record type must be
attributed/registered for use with protocol buffers. The popped frame will
be destroyed.
Parameters:
AValue: is filled with the deserialized protocol buffer.
Returns:
False if there were no frames left to pop, or True otherwise.
Raises an exception if the data in the frame is not in valid protocol
buffer format }
function PopProtocolBuffer<T: record>(out AValue: T): Boolean; overload;
function PopProtocolBuffer(const ARecordType: Pointer; out AValue): Boolean; overload;
public
{ Peeking frames }
{ Peeks at the frame at the @bold(front) of the message. It will @bold(not)
be removed from the message, and the message still owns it. So you should
@bold(not) free the returned frame.
Returns:
The peeked frame or nil if there are no frames left. }
function Peek: PZFrame; inline;
{ Peeks at the frame at the @bold(front) of the message and converts it to a
4-byte integer value. The frame will @bold(not) be removed from the
message.
Returns:
The integer value of the frame, or 0 if there are no frames left.
Raises an exception if the frame does not contain a 4-byte value. }
function PeekInteger: Integer;
{ Peeks at the frame at the @bold(front) of the message and converts it to a
4-byte single-precision floating-point value. The frame will @bold(not) be
removed from the message.
Returns:
The floating-point value of the frame, or 0 if there are no frames left.
Raises an exception if the frame does not contain a 4-byte value. }
function PeekSingle: Single;
{ Peeks at the frame at the @bold(front) of the message and converts it to a
8-byte double-precision floating-point value. The frame will @bold(not) be
removed from the message.
Returns:
The floating-point value of the frame, or 0 if there are no frames left
to pop.
Raises an exception if the frame does not contain a 8-byte value. }
function PeekDouble: Double;
{ Peeks at the frame at the @bold(front) of the message and converts it to
an enumerated value. The frame will @bold(not) be removed from the
message.
Returns:
The enum value of the frame, or the enum with ordinal value 0 if there
are no frames left.
Raises an exception if the frame does not contain a value the size of T. }
function PeekEnum<T: record>: T; overload;
{ Peeks at the frame at the @bold(front) of the message and converts it to
an enumerated value. The frame will @bold(not) be removed from the
message.
Parameters:
AEnumSize: the size of the enumeration type. Use SizeOf() for this.
Returns:
The ordinal enum value of the frame, or 0 if there are no frames left.
Raises an exception if the frame does not contain a value the size of
AEnumSize. }
function PeekEnum(const AEnumSize: Integer): Integer; overload;
{ Peeks at the frame at the @bold(front) of the message and UTF-8 decodes it
to a Unicode string. The frame will @bold(not) be removed from the
message.
Returns:
The string value of the frame, or an empty string if there are no frames
left.
Raises an exception if the frame does not contain valid UTF-8 data. }
function PeekString: String;
{ Peeks at the frame at the @bold(front) of the message and returns its data
as a byte array. The frame will @bold(not) be removed from the message.
Returns:
A byte array with the data of the frame, or nil if there are no frames
left. }
function PeekBytes: TBytes;
{ Peeks at the frame at the @bold(front) of the message and deserializes it
into a record in Google Protocol Buffer format. The record type must be
attributed/registered for use with protocol buffers. The frame will
@bold(not) be removed from the message.
Parameters:
AValue: is filled with the deserialized protocol buffer.
Returns:
False if there were no frames left, or True otherwise.
Raises an exception if the data in the frame is not in valid protocol
buffer format }
function PeekProtocolBuffer<T: record>(out AValue: T): Boolean; overload;
function PeekProtocolBuffer(const ARecordType: Pointer; out AValue): Boolean; overload;
public
{ Misc }
{ Pops a frame of the @bold(front) of the message. The caller owns the frame
now and must destroy it when finished with it.
If the next frame is empty, then that frame is popped and destroyed.
Returns:
The top frame or nil if there are no frames. }
function Unwrap: PZFrame;
public
{ Properties }
{ The number of frames in the message }
property FrameCount: Integer read FCount;
{ The frames in the message.
Parameters:
AIndex: the index of the frame to return. Use 0 for the first frame
at the front of the message. No range checking is performed on the
index. }
property Frames[const AIndex: Integer]: PZFrame read GetFrame;
end;
type
{ Ultra-thin (record) wrapper around ZMQ socket.
Modeled after CZMQ socket }
TZSocket = record
{$REGION 'Internal Declarations'}
private const
DYN_PORT_START = $C000;
DYN_PORT_END = $FFFF;
private
FHandle: Pointer;
FSocketType: TZSocketType;
function GetLinger: Integer;
procedure SetLinger(const Value: Integer);
function GetIsCurveServer: Boolean;
procedure SetIsCurveServer(const Value: Boolean);
{$ENDREGION 'Internal Declarations'}
public
{ Creates a socket.
Parameters:
AContext: the context to use to create the socket. Cannot be nil.
AType: the type of socket to create.
Raises an exception if the socket could not be created. }
constructor Create(const AContext: TZContext; const AType: TZSocketType);
{ Destroys the socket.
Parameters:
AContext: the context to use to destroy the socket. Cannot be nil. }
procedure Free(const AContext: TZContext); inline;
{ Binds the socket to an endpoint. For tcp:// endpoints, supports ephemeral
ports, if you specify the port number as "*". In that case, it uses the
IANA designated range from C000 (49152) to FFFF (65535).
Parameters:
AEndpoint: the endpoint to bind to.
Examples:
* tcp://127.0.0.1:5500 bind to port 5500
* tcp://127.0.0.1:* bind to first free port from C000 to FFFF
Returns:
On success, returns the actual port number used, for tcp:// endpoints,
and 0 for other transports. On failure, returns -1. Note that when using
ephemeral ports, a port may be reused by different services without
clients being aware. Protocols that run on ephemeral ports should take
this into account. }
function Bind(const AEndpoint: String): Integer;
{ Unbinds a socket from an endpoint.
Parameters:
AEndpoint: the endpoint to unbind from.
Returns:
True on success, False on failure. }
function Unbind(const AEndpoint: String): Boolean;
{ Connects the socket to an endpoint.
Parameters:
AEndpoint: the endpoint to connect to.
Returns:
True on success, False on error. }
function Connect(const AEndpoint: String): Boolean;
{ Disconnects the socket from an endpoint.
Parameters:
AEndpoint: the endpoint to disconnect from.
Returns:
True on success, False on error. }
function Disconnect(const AEndpoint: String): Boolean;
{ Polls the socket for input or output.
Parameters:
AEvent: the event to poll:
* TZSocketPoll.Input: when the function returns
TZSocketPollResult.Available, then at least one message may be
received from the socket without blocking.
* TZSocketPoll.Output: when the function returns
TZSocketPollResult.Available, then at least one message may be
sent to the socket without blocking.
ATimeout: the number of microseconds to wait for the event to occur. Set
to 0 to return immediately. Set to -1 to wait indefinitely.
Returns:
* TZSocketPollResult.Available: the event has occurred within the
timeout, and you can receive or send a message without blocking.
* TZSocketPollResult.Timeout: the event did not occur within the timeout.
* TZSocketPollResult.Interrupted: the operation was interrupted by
delivery of a signal before any events were available. }
function Poll(const AEvent: TZSocketPoll; const ATimeout: Integer): TZSocketPollResult;
{ Sends a message and frees it at some point.
Parameters:
AMessage: the message to send. Cannot be nil. Will be set to nil
afterwards.
Returns:
True on success, False on error or when the message has no frames.
The message may not be send immediately, but be queued. After it has been
send, it will be destroyed automatically, so there is no need to call
AMessage.Free. }
function Send(var AMessage: PZMessage): Boolean;
{ Receives a message from a socket. This is a blocking operation.
Parameters:
ASocket: the socket used to receive the message.
Returns:
The received message or nil if an error occured (eg. the operation was
interrupted).
@bold(Note): this function returns a newly allocated message.
You @bold(must) free it when you are done with it. }
function Receive: PZMessage;
{ Linger timeout. Number of msecs to flush when closing socket. }
property Linger: Integer read GetLinger write SetLinger;
{ Whether the socket will act as server for CURVE security. A value of True
means the socket will act as CURVE server. A value of False means the
socket will not act as CURVE server, and its security role then depends on
other option settings. Setting this to False shall reset the socket
security to nil. When you set this you must also set the server's secret
key using CurveSecretKey. A server socket does not need to know its own
public key. }
property IsCurveServer: Boolean read GetIsCurveServer write SetIsCurveServer;
{ Sets the socket's long term public key. You must set this on CURVE client
sockets. You provide the key as a 40-character string encoded in the Z85
encoding format. The public key must always be used with the matching
secret key.
Parameters:
AKey: the public key to set }
procedure SetCurvePublicKey(const AKey: String); overload;
{ Sets the socket's long term public key. You must set this on CURVE client
sockets. You provide the key as a 32-byte binary key. The public key must
always be used with the matching secret key.
Parameters:
AKey: the public key to set }
procedure SetCurvePublicKey(const AKey: TZCurveKey); overload;
{ Sets the socket's long term secret key. You must set this on both CURVE
client and server sockets. You provide the key as a 40-character string
encoded in the Z85 encoding format.
Parameters:
AKey: the public key to set }
procedure SetCurveSecretKey(const AKey: String); overload;
{ Sets the socket's long term secret key. You must set this on both CURVE
client and server sockets. You provide the key as a 32-byte binary key.
Parameters:
AKey: the public key to set }
procedure SetCurveSecretKey(const AKey: TZCurveKey); overload;
{ Sets the socket's long term server key. You must set this on CURVE client
sockets. You provide the key as a 40-character string encoded in the Z85
encoding format. This key must have been generated together with the
server's secret key.
Parameters:
AKey: the public key to set }
procedure SetCurveServerKey(const AKey: String); overload;
{ Sets the socket's long term server key. You must set this on CURVE client
sockets. You provide the key as a 32-byte binary key. This key must have
been generated together with the server's secret key.
Parameters:
AKey: the public key to set }
procedure SetCurveServerKey(const AKey: TZCurveKey); overload;
end;
type
{ Security certificates for the ZMQ CURVE mechanism }
TZCertificate = class(TObject)
{$REGION 'Internal Declarations'}
private const
FORTY_ZEROES = '0000000000000000000000000000000000000000';
private
FPublicKey: TZCurveKey;
FSecretKey: TZCurveKey;
FPublicTxt: String;
FSecretTxt: String;
{$ENDREGION 'Internal Declarations'}
public
{ Creates a new certificate with a newly generated random keypair }
constructor Create; overload;
{ Loads a certificate from a (JSON) file.
Parameters:
AFilename: the name of the file to load the certificate from.
If there is both a "public" as a "secret" certificate file, then the
"secret" file (with a "_secret" suffix) will be used. }
constructor Create(const AFilename: String); overload;
{ Saves the certificate to a "public" and "secret" file.
Parameters:
AFilename: the name of the "public" file to save the certificate to.
The "public" file will only store the public key. This method also creates
a "secret" file (with a "_secret" suffix) with both the public and secret
key. The "secret" file should not be provided to others. }
procedure Save(const AFilename: String);
{ Applies the certificate to a socket, for use with CURVE security on the
socket. If the certificate was loaded from a "public" file, then the
secret key will be undefined, and this certificate will not work
successfully.
Parameters:
ASocket: the socket to apply the certificate to }
procedure Apply(const ASocket: TZSocket);
{ The public key of the certificate }
property PublicKey: TZCurveKey read FPublicKey;
{ The secret key of the certificate }
property SecretKey: TZCurveKey read FSecretKey;
{ The public key as a 40-character Z85-encoded string }
property PublicTxt: String read FPublicTxt;
{ The secret key as a 40-character Z85-encoded string }
property SecretTxt: String read FSecretTxt;
end;
resourcestring
RS_ZMQ_ERROR_CREATING_SOCKET = 'Error creating ZMQ socket.';
RS_ZMQ_UNEXPECTED_FRAME = 'Unexpected frame in ZMQ message.';
RS_ZMQ_CANNOT_LOAD_CERTIFICATE = 'Unable to load certificate from file %s.';
implementation
{$IF Defined(WIN32) and Defined(DEBUG)}
{.$DEFINE CUSTOM_MALLOC}
{$ENDIF}
uses
{$IFDEF CUSTOM_MALLOC}
Winapi.Windows,
Winapi.ImageHlp,
{$ENDIF}
{$IFDEF ZMQ_VERIFY_HASH}
Grijjy.Hash,
{$ENDIF}
Grijjy.ProtocolBuffers,
Grijjy.Bson;
{$POINTERMATH ON}
{$IFDEF CUSTOM_MALLOC}
var
Finalized: Boolean = False;
function HookedMalloc(Size: Integer): Pointer; cdecl;
begin
GetMem(Result, Size);
end;
function HookedRealloc(MemBlock: Pointer; Size: Integer): Pointer; cdecl;
begin
ReallocMem(MemBlock, Size);
Result := MemBlock;
end;
procedure HookedFree(MemBlock: Pointer); cdecl;
begin
if (not Finalized) then
FreeMem(MemBlock);
end;
procedure ZMQInitializeDebugMode;
var
Process: THandle;
Module: HModule;
OrigMalloc, OrigRealloc, OrigFree, NewProc: Pointer;
Proc: PPointer;
IAT: PImageImportDescriptor;
Thunk: PImageThunkData;
C, OldProtect: Cardinal;
U: NativeUInt;
P: PAnsiChar;
S: String;
begin
{ ZMQ uses malloc, realloc and free for managing memory. In DEBUG mode on
Windows, we want to use Delphi's memory manager (or FastMM) instead to debug
memory issues. We do this by hooking these routines. We can do this because
libzmq.dll uses msvcr120.dll for these routines. The import address table
(IAT) of libzmq.dll has a section for each DLL it imports. We look for the
imports of msvcr120.dll. We replace the table entries for malloc, realloc
and free with our own versions. (We are only interested in malloc calls from
inside libzmq.dll, so we don't patch any other import tables).
First, we need to get the addresses of the original functions in
msvcr120.dll so we can look them up in the import table of libzmq.dll. }
Module := GetModuleHandle('msvcr120.dll');
if (Module = 0) then
Exit;
OrigMalloc := GetProcAddress(Module, 'malloc');
OrigRealloc := GetProcAddress(Module, 'realloc');
OrigFree := GetProcAddress(Module, 'free');
if (OrigMalloc = nil) or (OrigRealloc = nil) or (OrigFree = nil) then
Exit;
{ Now, load the import address table of libzmq.dll }
Module := GetModuleHandle('libzmq.dll');
if (Module = 0) then
Exit;
IAT := ImageDirectoryEntryToData(Pointer(Module), True,
IMAGE_DIRECTORY_ENTRY_IMPORT, C);
if (IAT = nil) then
Exit;
Process := GetCurrentProcess;
{ Enumerate all modules in the IAT until we find msvcr120.dll }
while (IAT.Name <> 0) do
begin
P := PAnsiChar(PByte(Module) + IAT.Name);
S := String(AnsiString(P));
if SameText(S, 'msvcr120.dll') then
begin
{ Now enumerate all functions in the IAT until we find one of the
functions we want to hook }
Thunk := PImageThunkData(PByte(Module) + IAT.FirstThunk);
while (Thunk._Function <> 0) do
begin
Proc := @Thunk._Function;
if (Proc^ = OrigMalloc) then
begin
NewProc := @HookedMalloc;
OrigMalloc := nil;
end
else if (Proc^ = OrigRealloc) then
begin
NewProc := @HookedRealloc;
OrigRealloc := nil;
end
else if (Proc^ = OrigFree) then
begin
NewProc := @HookedFree;
OrigFree := nil;
end
else
NewProc := nil;
{ Hook the function }
if Assigned(NewProc) then
begin
if (not WriteProcessMemory(Process, Proc, @NewProc, SizeOf(NewProc), U))
and (GetLastError = ERROR_NOACCESS) then
begin
if (VirtualProtect(Proc, SizeOf(NewProc), PAGE_WRITECOPY, OldProtect)) then
begin
WriteProcessMemory(GetCurrentProcess, Proc, @NewProc, SizeOf(NewProc), U);
VirtualProtect(Proc, SizeOf(NewProc), OldProtect, OldProtect);
end;
end;
end;
Inc(Thunk);
end;
Break;
end;
Inc(IAT);
end;
{ All 3 functions should be hooked now, and the original functions should be
set to nil }
Assert((OrigMalloc = nil) and (OrigRealloc = nil) and (OrigFree = nil));
end;
procedure ZMQFinalizeDebugMode;
begin
Finalized := True;
end;
{$ELSE}
procedure ZMQInitializeDebugMode;
begin
{ Nothing }
end;
procedure ZMQFinalizeDebugMode;
begin